diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index b84040cd..1db39d2b 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -2335,7 +2335,7 @@ TEST (network, replace_port) } } auto peers_list (system.nodes[0]->network.list (std::numeric_limits::max ())); - ASSERT_EQ (peers_list[0]->get_node_id ().get (), node1->node_id.pub); + ASSERT_EQ (peers_list[0]->get_node_id (), node1->node_id.pub); auto channel (std::make_shared (system.nodes[0]->network.udp_channels, node1->network.endpoint ())); system.nodes[0]->network.send_keepalive (channel); system.deadline_set (5s); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 0083302f..d961e5ff 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2691,7 +2691,12 @@ TEST (node, peers) { ASSERT_NO_ERROR (system.poll ()); } - + // Wait to finish TCP node ID handshakes + system.deadline_set (10s); + while (system.nodes.back ()->network.response_channels.size () == 0 || system.nodes.front ()->network.response_channels.size () == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } // Confirm that the peers match with the endpoints we are expecting ASSERT_EQ (1, system.nodes.front ()->network.size ()); auto list1 (system.nodes[0]->network.list (2)); @@ -2838,6 +2843,75 @@ TEST (node, dont_write_lock_node) finished_promise.set_value (); } +TEST (node, bidirectional_tcp) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.delay_frontier_confirmation_height_updating = true; + node_flags.disable_udp = true; // Disable UDP connections + auto node1 = system.add_node (nano::node_config (24000, system.logging), node_flags); + nano::node_config node_config (24001, system.logging); + node_config.tcp_incoming_connections_max = 0; // Disable incoming TCP connections for node 2 + auto node2 = system.add_node (node_config, node_flags); + // Check network connections + ASSERT_EQ (1, node1->network.size ()); + ASSERT_EQ (1, node2->network.size ()); + auto list1 (node1->network.list (1)); + ASSERT_EQ (nano::transport::transport_type::tcp, list1[0]->get_type ()); + ASSERT_NE (node2->network.endpoint (), list1[0]->get_endpoint ()); // Ephemeral port + ASSERT_EQ (node2->node_id.pub, list1[0]->get_node_id ()); + auto list2 (node2->network.list (1)); + ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ()); + ASSERT_EQ (node1->network.endpoint (), list2[0]->get_endpoint ()); + ASSERT_EQ (node1->node_id.pub, list2[0]->get_node_id ()); + // Test block propagation from node 1 + nano::genesis genesis; + nano::keypair key; + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, node1->work_generate_blocking (genesis.hash ()))); + node1->process_active (send1); + node1->block_processor.flush (); + system.deadline_set (5s); + while (!node1->ledger.block_exists (send1->hash ()) || !node2->ledger.block_exists (send1->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } + // Test block confirmation from node 1 + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + bool confirmed (false); + system.deadline_set (10s); + while (!confirmed) + { + auto transaction1 (node1->store.tx_begin_read ()); + auto transaction2 (node2->store.tx_begin_read ()); + confirmed = node1->ledger.block_confirmed (transaction1, send1->hash ()) && node2->ledger.block_confirmed (transaction2, send1->hash ()); + ASSERT_NO_ERROR (system.poll ()); + } + // Test block propagation from node 2 + { + auto transaction (system.wallet (0)->wallets.tx_begin_write ()); + system.wallet (0)->store.erase (transaction, nano::test_genesis_key.pub); + } + auto send2 (std::make_shared (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, node1->work_generate_blocking (send1->hash ()))); + node2->process_active (send2); + node2->block_processor.flush (); + system.deadline_set (5s); + while (!node1->ledger.block_exists (send2->hash ()) || !node2->ledger.block_exists (send2->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } + // Test block confirmation from node 2 + system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); + confirmed = false; + system.deadline_set (10s); + while (!confirmed) + { + auto transaction1 (node1->store.tx_begin_read ()); + auto transaction2 (node2->store.tx_begin_read ()); + confirmed = node1->ledger.block_confirmed (transaction1, send2->hash ()) && node2->ledger.block_confirmed (transaction2, send2->hash ()); + ASSERT_NO_ERROR (system.poll ()); + } +} + namespace nano { TEST (confirmation_height, prioritize_frontiers) diff --git a/nano/node/bootstrap.cpp b/nano/node/bootstrap.cpp index 5f3061cd..dd5abe6e 100644 --- a/nano/node/bootstrap.cpp +++ b/nano/node/bootstrap.cpp @@ -1911,14 +1911,21 @@ nano::bootstrap_server::~bootstrap_server () { node->logger.try_log ("Exiting incoming TCP/bootstrap server"); } - if (bootstrap_connection) + if (type == nano::bootstrap_server_type::bootstrap) { --node->bootstrap.bootstrap_count; } - if (node_id_handshake_finished) + else if (type == nano::bootstrap_server_type::realtime) { --node->bootstrap.realtime_count; node->network.response_channels.remove (remote_endpoint); + // Clear temporary channel + auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint)); + if (exisiting_response_channel != nullptr) + { + exisiting_response_channel->server = false; + node->network.tcp_channels.erase (remote_endpoint); + } } stop (); std::lock_guard lock (node->bootstrap.mutex); @@ -1929,7 +1936,6 @@ void nano::bootstrap_server::stop () { if (!stopped.exchange (true)) { - std::lock_guard lock (mutex); if (socket != nullptr) { socket->close (); @@ -2154,7 +2160,7 @@ void nano::bootstrap_server::receive_keepalive_action (boost::system::error_code std::unique_ptr request (new nano::keepalive (error, stream, header_a)); if (!error) { - if (node_id_handshake_finished) + if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server) { add_request (std::unique_ptr (request.release ())); } @@ -2179,7 +2185,7 @@ void nano::bootstrap_server::receive_publish_action (boost::system::error_code c std::unique_ptr request (new nano::publish (error, stream, header_a)); if (!error) { - if (node_id_handshake_finished) + if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server) { add_request (std::unique_ptr (request.release ())); } @@ -2204,7 +2210,7 @@ void nano::bootstrap_server::receive_confirm_req_action (boost::system::error_co std::unique_ptr request (new nano::confirm_req (error, stream, header_a)); if (!error) { - if (node_id_handshake_finished) + if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server) { add_request (std::unique_ptr (request.release ())); } @@ -2226,7 +2232,7 @@ void nano::bootstrap_server::receive_confirm_ack_action (boost::system::error_co std::unique_ptr request (new nano::confirm_ack (error, stream, header_a)); if (!error) { - if (node_id_handshake_finished) + if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server) { add_request (std::unique_ptr (request.release ())); } @@ -2248,7 +2254,7 @@ void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::er std::unique_ptr request (new nano::node_id_handshake (error, stream, header_a)); if (!error) { - if (!node_id_handshake_finished && !node->flags.disable_tcp_realtime) + if (type == nano::bootstrap_server_type::undefined && !node->flags.disable_tcp_realtime) { add_request (std::unique_ptr (request.release ())); } @@ -2356,7 +2362,7 @@ public: connection->finish_request_async (); auto connection_l (connection->shared_from_this ()); connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id); + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); }); } void confirm_req (nano::confirm_req const & message_a) override @@ -2364,7 +2370,7 @@ public: connection->finish_request_async (); auto connection_l (connection->shared_from_this ()); connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id); + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); }); } void confirm_ack (nano::confirm_ack const & message_a) override @@ -2372,7 +2378,7 @@ public: connection->finish_request_async (); auto connection_l (connection->shared_from_this ()); connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id); + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); }); } void bulk_pull (nano::bulk_pull const &) override @@ -2429,10 +2435,11 @@ public: } else if (message_a.response) { - connection->remote_node_id = message_a.response->first; - if (!connection->node->network.syn_cookies.validate (nano::transport::map_tcp_to_endpoint (connection->remote_endpoint), connection->remote_node_id, message_a.response->second) && connection->remote_node_id != connection->node->node_id.pub) + auto node_id (message_a.response->first); + connection->remote_node_id = node_id; + if (!connection->node->network.syn_cookies.validate (nano::transport::map_tcp_to_endpoint (connection->remote_endpoint), node_id, message_a.response->second) && node_id != connection->node->node_id.pub) { - connection->node_id_handshake_finished = true; + connection->type = nano::bootstrap_server_type::realtime; ++connection->node->bootstrap.realtime_count; connection->finish_request_async (); } @@ -2446,9 +2453,10 @@ public: { connection->finish_request_async (); } + assert (connection->remote_node_id.is_zero () || connection->type == nano::bootstrap_server_type::realtime); auto connection_l (connection->shared_from_this ()); connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id); + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); }); } std::shared_ptr connection; @@ -2464,12 +2472,12 @@ void nano::bootstrap_server::run_next () bool nano::bootstrap_server::is_bootstrap_connection () { - if (!bootstrap_connection && !node->flags.disable_bootstrap_listener && node->bootstrap.bootstrap_count < node->config.bootstrap_connections_max) + if (type == nano::bootstrap_server_type::undefined && !node->flags.disable_bootstrap_listener && node->bootstrap.bootstrap_count < node->config.bootstrap_connections_max) { ++node->bootstrap.bootstrap_count; - bootstrap_connection = true; + type = nano::bootstrap_server_type::bootstrap; } - return bootstrap_connection; + return type == nano::bootstrap_server_type::bootstrap; } /** diff --git a/nano/node/bootstrap.hpp b/nano/node/bootstrap.hpp index 478170d4..fba053e3 100644 --- a/nano/node/bootstrap.hpp +++ b/nano/node/bootstrap.hpp @@ -289,6 +289,13 @@ private: std::unique_ptr collect_seq_con_info (bootstrap_listener & bootstrap_listener, const std::string & name); class message; +enum class bootstrap_server_type +{ + undefined, + bootstrap, + realtime, + realtime_response_server // special type for tcp channel response server +}; class bootstrap_server final : public std::enable_shared_from_this { public: @@ -317,9 +324,9 @@ public: std::mutex mutex; std::queue> requests; std::atomic stopped{ false }; - std::atomic bootstrap_connection{ false }; - std::atomic node_id_handshake_finished{ false }; + std::atomic type{ nano::bootstrap_server_type::undefined }; std::atomic keepalive_first{ true }; + // Remote enpoint used to remove response channel even after socket closing nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; nano::account remote_node_id{ 0 }; }; diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 8a758770..131c46f0 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -2529,7 +2529,7 @@ void nano::json_handler::peers () { boost::property_tree::ptree pending_tree; pending_tree.put ("protocol_version", std::to_string (channel->get_network_version ())); - auto node_id_l (channel->get_node_id ()); + auto node_id_l (channel->get_node_id_optional ()); if (node_id_l.is_initialized ()) { pending_tree.put ("node_id", node_id_l.get ().to_account ()); diff --git a/nano/node/testing.cpp b/nano/node/testing.cpp index 3475d938..3df0f494 100644 --- a/nano/node/testing.cpp +++ b/nano/node/testing.cpp @@ -63,7 +63,7 @@ std::shared_ptr nano::system::add_node (nano::node_config const & no new1 = node1->network.size (); new2 = node2->network.size (); } while (new1 == starting1 || new2 == starting2); - if (type_a == nano::transport::transport_type::tcp && !node_flags_a.disable_tcp_realtime) + if (type_a == nano::transport::transport_type::tcp && node_config_a.tcp_incoming_connections_max != 0 && !node_flags_a.disable_tcp_realtime) { // Wait for initial connection finish decltype (starting_listener1) new_listener1; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 6be4e1c2..622eb6bf 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -11,10 +11,17 @@ socket (socket_a) nano::transport::channel_tcp::~channel_tcp () { std::lock_guard lk (channel_mutex); - if (socket) + // Close socket. Exception: socket is used by bootstrap_server + if (socket && !server) { socket->close (); } + // Remove response server + if (response_server != nullptr) + { + response_server->stop (); + response_server = nullptr; + } } size_t nano::transport::channel_tcp::hash_code () const @@ -90,12 +97,19 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr ().find (endpoint)); if (existing == channels.get ().end ()) { + auto node_id (channel_a->get_node_id ()); + if (!channel_a->server) + { + channels.get ().erase (node_id); + } channels.get ().insert ({ channel_a }); error = false; lock.unlock (); node.network.channel_observer (channel_a); // Remove UDP channel to same IP:port if exists node.network.udp_channels.erase (udp_endpoint); + // Remove UDP channels with same node ID + node.network.udp_channels.clean_node_id (node_id); } } return error; @@ -226,7 +240,7 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer () return result; } -void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a) +void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a, std::shared_ptr socket_a, nano::bootstrap_server_type type_a) { if (!stopped) { @@ -242,11 +256,31 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa { node.network.process_message (message_a, channel); } - else if (!node.flags.disable_udp || (message_a.header.type != nano::message_type::confirm_req && message_a.header.type != nano::message_type::confirm_ack)) + else if (!node_id_a.is_zero ()) { - // confirm_req & confirm_ack are only message types that can produce response - auto udp_channel (std::make_shared (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a))); - node.network.process_message (message_a, udp_channel); + // Add temporary channel + socket_a->set_writer_concurrency (nano::socket::concurrency::multi_writer); + auto temporary_channel (std::make_shared (node, socket_a)); + assert (endpoint_a == temporary_channel->get_tcp_endpoint ()); + temporary_channel->set_node_id (node_id_a); + temporary_channel->set_network_version (message_a.header.version_using); + temporary_channel->set_last_packet_received (std::chrono::steady_clock::now ()); + temporary_channel->set_last_packet_sent (std::chrono::steady_clock::now ()); + temporary_channel->server = true; + // Don't insert temporary channels for response_server + assert (type_a == nano::bootstrap_server_type::realtime); + if (type_a == nano::bootstrap_server_type::realtime) + { + insert (temporary_channel); + } + node.network.process_message (message_a, temporary_channel); + } + else + { + // Initial node_id_handshake request without node ID + assert (message_a.header.type == nano::message_type::node_id_handshake); + assert (type_a == nano::bootstrap_server_type::undefined); + node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); } } } @@ -291,6 +325,7 @@ void nano::transport::tcp_channels::start () void nano::transport::tcp_channels::stop () { stopped = true; + std::unique_lock lock (mutex); // Close all TCP sockets for (auto i (channels.begin ()), j (channels.end ()); i != j; ++i) { @@ -298,6 +333,12 @@ void nano::transport::tcp_channels::stop () { i->channel->socket->close (); } + // Remove response server + if (i->channel->response_server != nullptr) + { + i->channel->response_server->stop (); + i->channel->response_server = nullptr; + } } } @@ -496,7 +537,18 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrset_network_version (header.version_using); auto node_id (message.response->first); - if (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub && !node_l->network.tcp_channels.find_node_id (node_id)) + bool process (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub); + if (process) + { + /* If node ID is known, don't establish new connection + Exception: temporary channels from bootstrap_server */ + auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id)); + if (existing_channel) + { + process = existing_channel->server; + } + } + if (process) { channel_a->set_node_id (node_id); channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); @@ -519,6 +571,12 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrresponse_server = std::make_shared (channel_a->socket, node_l); + channel_a->response_server->keepalive_first = false; + channel_a->response_server->type = nano::bootstrap_server_type::realtime_response_server; + channel_a->response_server->remote_node_id = channel_a->get_node_id (); + channel_a->response_server->receive (); } else { @@ -531,7 +589,6 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr socket; + std::shared_ptr response_server; + bool server{ false }; nano::endpoint get_endpoint () const override { @@ -85,7 +89,7 @@ namespace transport void receive (); void start (); void stop (); - void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &); + void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr, nano::bootstrap_server_type); void process_keepalive (nano::keepalive const &, nano::tcp_endpoint const &, bool); bool max_ip_connections (nano::tcp_endpoint const &); // Should we reach out to this endpoint with a keepalive message @@ -143,16 +147,9 @@ namespace transport } nano::account node_id () const { - auto node_id_l (channel->get_node_id ()); - if (node_id_l.is_initialized ()) - { - return node_id_l.get (); - } - else - { - assert (false); - return 0; - } + auto node_id (channel->get_node_id ()); + assert (!node_id.is_zero ()); + return node_id; } }; class tcp_endpoint_attempt final diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 539bce14..2fda8ea2 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -97,12 +97,25 @@ namespace transport last_packet_sent = time_a; } - boost::optional get_node_id () const + boost::optional get_node_id_optional () const { std::lock_guard lk (channel_mutex); return node_id; } + nano::account get_node_id () const + { + std::lock_guard lk (channel_mutex); + if (node_id.is_initialized ()) + { + return node_id.get (); + } + else + { + return 0; + } + } + void set_node_id (nano::account node_id_a) { std::lock_guard lk (channel_mutex); diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 14894eff..c7224e28 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -215,6 +215,12 @@ std::shared_ptr nano::transport::udp_channels::fin return result; } +void nano::transport::udp_channels::clean_node_id (nano::account const & node_id_a) +{ + std::lock_guard lock (mutex); + channels.get ().erase (node_id_a); +} + void nano::transport::udp_channels::clean_node_id (nano::endpoint const & endpoint_a, nano::account const & node_id_a) { std::lock_guard lock (mutex); diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 86f6155a..e560f8ed 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -70,6 +70,7 @@ namespace transport std::unordered_set> random_set (size_t) const; bool store_all (bool = true); std::shared_ptr find_node_id (nano::account const &); + void clean_node_id (nano::account const &); void clean_node_id (nano::endpoint const &, nano::account const &); // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (uint8_t connection_protocol_version_min = nano::protocol_version_reasonable_min); @@ -133,15 +134,7 @@ namespace transport } nano::account node_id () const { - auto node_id_l (channel->get_node_id ()); - if (node_id_l.is_initialized ()) - { - return node_id_l.get (); - } - else - { - return 0; - } + return channel->get_node_id (); } }; class endpoint_attempt final diff --git a/nano/qt/qt.cpp b/nano/qt/qt.cpp index 0c64f1b8..5fb5ae26 100644 --- a/nano/qt/qt.cpp +++ b/nano/qt/qt.cpp @@ -1949,7 +1949,7 @@ void nano_qt::advanced_actions::refresh_peers () version->setData (QVariant (channel->get_network_version ()), Qt::DisplayRole); items.push_back (version); QString node_id (""); - auto node_id_l (channel->get_node_id ()); + auto node_id_l (channel->get_node_id_optional ()); if (node_id_l.is_initialized ()) { node_id = node_id_l.get ().to_account ().c_str ();