diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 78771231..dbafe14b 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -146,6 +146,12 @@ TEST (node, send_single_many_peers) { ASSERT_NO_ERROR (system.poll ()); } + system.stop (); + for (auto node : system.nodes) + { + ASSERT_TRUE (node->stopped); + ASSERT_TRUE (node->network.tcp_channels.node_id_handhake_sockets_empty ()); + } } TEST (node, send_out_of_order) diff --git a/nano/node/bootstrap/bootstrap.cpp b/nano/node/bootstrap/bootstrap.cpp index 3684d344..64aff831 100644 --- a/nano/node/bootstrap/bootstrap.cpp +++ b/nano/node/bootstrap/bootstrap.cpp @@ -28,10 +28,11 @@ constexpr double nano::bootstrap_limits::lazy_batch_pull_count_resize_ratio; constexpr std::chrono::hours nano::bootstrap_excluded_peers::exclude_time_hours; constexpr std::chrono::hours nano::bootstrap_excluded_peers::exclude_remove_hours; -nano::bootstrap_client::bootstrap_client (std::shared_ptr node_a, std::shared_ptr attempt_a, std::shared_ptr channel_a) : +nano::bootstrap_client::bootstrap_client (std::shared_ptr node_a, std::shared_ptr attempt_a, std::shared_ptr channel_a, std::shared_ptr socket_a) : node (node_a), attempt (attempt_a), channel (channel_a), +socket (socket_a), receive_buffer (std::make_shared> ()), start_time (std::chrono::steady_clock::now ()), block_count (0), @@ -356,26 +357,29 @@ void nano::bootstrap_attempt::populate_connections () { if (auto client = c.lock ()) { - new_clients.push_back (client); - endpoints.insert (client->channel->socket->remote_endpoint ()); - double elapsed_sec = client->elapsed_seconds (); - auto blocks_per_sec = client->block_rate (); - rate_sum += blocks_per_sec; - if (client->elapsed_seconds () > nano::bootstrap_limits::bootstrap_connection_warmup_time_sec && client->block_count > 0) + if (auto socket_l = client->channel->socket.lock ()) { - sorted_connections.push (client); - } - // Force-stop the slowest peers, since they can take the whole bootstrap hostage by dribbling out blocks on the last remaining pull. - // This is ~1.5kilobits/sec. - if (elapsed_sec > nano::bootstrap_limits::bootstrap_minimum_termination_time_sec && blocks_per_sec < nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec) - { - if (node->config.logging.bulk_pull_logging ()) + new_clients.push_back (client); + endpoints.insert (socket_l->remote_endpoint ()); + double elapsed_sec = client->elapsed_seconds (); + auto blocks_per_sec = client->block_rate (); + rate_sum += blocks_per_sec; + if (client->elapsed_seconds () > nano::bootstrap_limits::bootstrap_connection_warmup_time_sec && client->block_count > 0) { - node->logger.try_log (boost::str (boost::format ("Stopping slow peer %1% (elapsed sec %2%s > %3%s and %4% blocks per second < %5%)") % client->channel->to_string () % elapsed_sec % nano::bootstrap_limits::bootstrap_minimum_termination_time_sec % blocks_per_sec % nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec)); + sorted_connections.push (client); } + // Force-stop the slowest peers, since they can take the whole bootstrap hostage by dribbling out blocks on the last remaining pull. + // This is ~1.5kilobits/sec. + if (elapsed_sec > nano::bootstrap_limits::bootstrap_minimum_termination_time_sec && blocks_per_sec < nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec) + { + if (node->config.logging.bulk_pull_logging ()) + { + node->logger.try_log (boost::str (boost::format ("Stopping slow peer %1% (elapsed sec %2%s > %3%s and %4% blocks per second < %5%)") % client->channel->to_string () % elapsed_sec % nano::bootstrap_limits::bootstrap_minimum_termination_time_sec % blocks_per_sec % nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec)); + } - client->stop (true); - new_clients.pop_back (); + client->stop (true); + new_clients.pop_back (); + } } } } @@ -469,7 +473,7 @@ void nano::bootstrap_attempt::connect_client (nano::tcp_endpoint const & endpoin { this_l->node->logger.try_log (boost::str (boost::format ("Connection established to %1%") % endpoint_a)); } - auto client (std::make_shared (this_l->node, this_l, std::make_shared (*this_l->node, socket))); + auto client (std::make_shared (this_l->node, this_l, std::make_shared (*this_l->node, socket), socket)); this_l->pool_connection (client); } else @@ -500,9 +504,12 @@ void nano::bootstrap_attempt::pool_connection (std::shared_ptrpending_stop && !node->bootstrap_initiator.excluded_peers.check (client_a->channel->get_tcp_endpoint ())) { // Idle bootstrap client socket - client_a->channel->socket->start_timer (node->network_params.node.idle_timeout); - // Push into idle deque - idle.push_back (client_a); + if (auto socket_l = client_a->channel->socket.lock ()) + { + socket_l->start_timer (node->network_params.node.idle_timeout); + // Push into idle deque + idle.push_back (client_a); + } } condition.notify_all (); } @@ -516,7 +523,7 @@ void nano::bootstrap_attempt::stop () { if (auto client = i.lock ()) { - client->channel->socket->close (); + client->socket->close (); } } if (auto i = frontiers.lock ()) @@ -622,7 +629,10 @@ void nano::bootstrap_attempt::attempt_restart_check (nano::unique_lockchannel->socket->close (); + if (auto socket_l = client->channel->socket.lock ()) + { + socket_l->close (); + } } } idle.clear (); diff --git a/nano/node/bootstrap/bootstrap.hpp b/nano/node/bootstrap/bootstrap.hpp index 17727154..32dffaf7 100644 --- a/nano/node/bootstrap/bootstrap.hpp +++ b/nano/node/bootstrap/bootstrap.hpp @@ -158,7 +158,7 @@ public: class bootstrap_client final : public std::enable_shared_from_this { public: - bootstrap_client (std::shared_ptr, std::shared_ptr, std::shared_ptr); + bootstrap_client (std::shared_ptr, std::shared_ptr, std::shared_ptr, std::shared_ptr); ~bootstrap_client (); std::shared_ptr shared (); void stop (bool force); @@ -167,6 +167,7 @@ public: std::shared_ptr node; std::shared_ptr attempt; std::shared_ptr channel; + std::shared_ptr socket; std::shared_ptr> receive_buffer; std::chrono::steady_clock::time_point start_time; std::atomic block_count; diff --git a/nano/node/bootstrap/bootstrap_bulk_pull.cpp b/nano/node/bootstrap/bootstrap_bulk_pull.cpp index 44dd1478..440b6544 100644 --- a/nano/node/bootstrap/bootstrap_bulk_pull.cpp +++ b/nano/node/bootstrap/bootstrap_bulk_pull.cpp @@ -121,80 +121,87 @@ void nano::bulk_pull_client::throttled_receive_block () void nano::bulk_pull_client::receive_block () { auto this_l (shared_from_this ()); - connection->channel->socket->async_read (connection->receive_buffer, 1, [this_l](boost::system::error_code const & ec, size_t size_a) { - if (!ec) - { - this_l->received_type (); - } - else - { - if (this_l->connection->node->config.logging.bulk_pull_logging ()) + if (auto socket_l = connection->channel->socket.lock ()) + { + socket_l->async_read (connection->receive_buffer, 1, [this_l](boost::system::error_code const & ec, size_t size_a) { + if (!ec) { - this_l->connection->node->logger.try_log (boost::str (boost::format ("Error receiving block type: %1%") % ec.message ())); + this_l->received_type (); } - this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_receive_block_failure, nano::stat::dir::in); - this_l->network_error = true; - } - }); + else + { + if (this_l->connection->node->config.logging.bulk_pull_logging ()) + { + this_l->connection->node->logger.try_log (boost::str (boost::format ("Error receiving block type: %1%") % ec.message ())); + } + this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_receive_block_failure, nano::stat::dir::in); + this_l->network_error = true; + } + }); + } } void nano::bulk_pull_client::received_type () { auto this_l (shared_from_this ()); nano::block_type type (static_cast (connection->receive_buffer->data ()[0])); - switch (type) + + if (auto socket_l = connection->channel->socket.lock ()) { - case nano::block_type::send: + switch (type) { - connection->channel->socket->async_read (connection->receive_buffer, nano::send_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { - this_l->received_block (ec, size_a, type); - }); - break; - } - case nano::block_type::receive: - { - connection->channel->socket->async_read (connection->receive_buffer, nano::receive_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { - this_l->received_block (ec, size_a, type); - }); - break; - } - case nano::block_type::open: - { - connection->channel->socket->async_read (connection->receive_buffer, nano::open_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { - this_l->received_block (ec, size_a, type); - }); - break; - } - case nano::block_type::change: - { - connection->channel->socket->async_read (connection->receive_buffer, nano::change_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { - this_l->received_block (ec, size_a, type); - }); - break; - } - case nano::block_type::state: - { - connection->channel->socket->async_read (connection->receive_buffer, nano::state_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { - this_l->received_block (ec, size_a, type); - }); - break; - } - case nano::block_type::not_a_block: - { - // Avoid re-using slow peers, or peers that sent the wrong blocks. - if (!connection->pending_stop && (expected == pull.end || (pull.count != 0 && pull.count == pull_blocks))) + case nano::block_type::send: { - connection->attempt->pool_connection (connection); + socket_l->async_read (connection->receive_buffer, nano::send_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { + this_l->received_block (ec, size_a, type); + }); + break; } - break; - } - default: - { - if (connection->node->config.logging.network_packet_logging ()) + case nano::block_type::receive: { - connection->node->logger.try_log (boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast (type))); + socket_l->async_read (connection->receive_buffer, nano::receive_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { + this_l->received_block (ec, size_a, type); + }); + break; + } + case nano::block_type::open: + { + socket_l->async_read (connection->receive_buffer, nano::open_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { + this_l->received_block (ec, size_a, type); + }); + break; + } + case nano::block_type::change: + { + socket_l->async_read (connection->receive_buffer, nano::change_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { + this_l->received_block (ec, size_a, type); + }); + break; + } + case nano::block_type::state: + { + socket_l->async_read (connection->receive_buffer, nano::state_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) { + this_l->received_block (ec, size_a, type); + }); + break; + } + case nano::block_type::not_a_block: + { + // Avoid re-using slow peers, or peers that sent the wrong blocks. + if (!connection->pending_stop && (expected == pull.end || (pull.count != 0 && pull.count == pull_blocks))) + { + connection->attempt->pool_connection (connection); + } + break; + } + default: + { + if (connection->node->config.logging.network_packet_logging ()) + { + connection->node->logger.try_log (boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast (type))); + } + break; } - break; } } } @@ -329,68 +336,71 @@ void nano::bulk_pull_account_client::receive_pending () { auto this_l (shared_from_this ()); size_t size_l (sizeof (nano::uint256_union) + sizeof (nano::uint128_union)); - connection->channel->socket->async_read (connection->receive_buffer, size_l, [this_l, size_l](boost::system::error_code const & ec, size_t size_a) { - // An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect, - // we simply get a size of 0. - if (size_a == size_l) - { - if (!ec) + if (auto socket_l = connection->channel->socket.lock ()) + { + socket_l->async_read (connection->receive_buffer, size_l, [this_l, size_l](boost::system::error_code const & ec, size_t size_a) { + // An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect, + // we simply get a size of 0. + if (size_a == size_l) { - nano::block_hash pending; - nano::bufferstream frontier_stream (this_l->connection->receive_buffer->data (), sizeof (nano::uint256_union)); - auto error1 (nano::try_read (frontier_stream, pending)); - (void)error1; - assert (!error1); - nano::amount balance; - nano::bufferstream balance_stream (this_l->connection->receive_buffer->data () + sizeof (nano::uint256_union), sizeof (nano::uint128_union)); - auto error2 (nano::try_read (balance_stream, balance)); - (void)error2; - assert (!error2); - if (this_l->pull_blocks == 0 || !pending.is_zero ()) + if (!ec) { - if (this_l->pull_blocks == 0 || balance.number () >= this_l->connection->node->config.receive_minimum.number ()) + nano::block_hash pending; + nano::bufferstream frontier_stream (this_l->connection->receive_buffer->data (), sizeof (nano::uint256_union)); + auto error1 (nano::try_read (frontier_stream, pending)); + (void)error1; + assert (!error1); + nano::amount balance; + nano::bufferstream balance_stream (this_l->connection->receive_buffer->data () + sizeof (nano::uint256_union), sizeof (nano::uint128_union)); + auto error2 (nano::try_read (balance_stream, balance)); + (void)error2; + assert (!error2); + if (this_l->pull_blocks == 0 || !pending.is_zero ()) { - this_l->pull_blocks++; + if (this_l->pull_blocks == 0 || balance.number () >= this_l->connection->node->config.receive_minimum.number ()) { - if (!pending.is_zero ()) + this_l->pull_blocks++; { - auto transaction (this_l->connection->node->store.tx_begin_read ()); - if (!this_l->connection->node->store.block_exists (transaction, pending)) + if (!pending.is_zero ()) { - this_l->connection->attempt->lazy_start (pending); + auto transaction (this_l->connection->node->store.tx_begin_read ()); + if (!this_l->connection->node->store.block_exists (transaction, pending)) + { + this_l->connection->attempt->lazy_start (pending); + } } } + this_l->receive_pending (); + } + else + { + this_l->connection->attempt->requeue_pending (this_l->account); } - this_l->receive_pending (); } else { - this_l->connection->attempt->requeue_pending (this_l->account); + this_l->connection->attempt->pool_connection (this_l->connection); } } else { - this_l->connection->attempt->pool_connection (this_l->connection); + this_l->connection->attempt->requeue_pending (this_l->account); + if (this_l->connection->node->config.logging.network_logging ()) + { + this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while receiving bulk pull account frontier %1%") % ec.message ())); + } } } else { this_l->connection->attempt->requeue_pending (this_l->account); - if (this_l->connection->node->config.logging.network_logging ()) + if (this_l->connection->node->config.logging.network_message_logging ()) { - this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while receiving bulk pull account frontier %1%") % ec.message ())); + this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % size_l % size_a)); } } - } - else - { - this_l->connection->attempt->requeue_pending (this_l->account); - if (this_l->connection->node->config.logging.network_message_logging ()) - { - this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % size_l % size_a)); - } - } - }); + }); + } } /** diff --git a/nano/node/bootstrap/bootstrap_frontier.cpp b/nano/node/bootstrap/bootstrap_frontier.cpp index 90f9281f..a5ff90ad 100644 --- a/nano/node/bootstrap/bootstrap_frontier.cpp +++ b/nano/node/bootstrap/bootstrap_frontier.cpp @@ -51,21 +51,24 @@ nano::frontier_req_client::~frontier_req_client () void nano::frontier_req_client::receive_frontier () { auto this_l (shared_from_this ()); - connection->channel->socket->async_read (connection->receive_buffer, nano::frontier_req_client::size_frontier, [this_l](boost::system::error_code const & ec, size_t size_a) { - // An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect, - // we simply get a size of 0. - if (size_a == nano::frontier_req_client::size_frontier) - { - this_l->received_frontier (ec, size_a); - } - else - { - if (this_l->connection->node->config.logging.network_message_logging ()) + if (auto socket_l = connection->channel->socket.lock ()) + { + socket_l->async_read (connection->receive_buffer, nano::frontier_req_client::size_frontier, [this_l](boost::system::error_code const & ec, size_t size_a) { + // An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect, + // we simply get a size of 0. + if (size_a == nano::frontier_req_client::size_frontier) { - this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % nano::frontier_req_client::size_frontier % size_a)); + this_l->received_frontier (ec, size_a); } - } - }); + else + { + if (this_l->connection->node->config.logging.network_message_logging ()) + { + this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % nano::frontier_req_client::size_frontier % size_a)); + } + } + }); + } } void nano::frontier_req_client::unsynced (nano::block_hash const & head, nano::block_hash const & end) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index d2996ffa..4935ccf6 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -577,20 +577,23 @@ public: nano::node_id_handshake response_message (cookie, response); auto shared_const_buffer = response_message.to_shared_const_buffer (); // clang-format off - connection->socket->async_write (shared_const_buffer, [connection = connection ](boost::system::error_code const & ec, size_t size_a) { - if (ec) + connection->socket->async_write (shared_const_buffer, [connection = std::weak_ptr (connection) ](boost::system::error_code const & ec, size_t size_a) { + if (auto connection_l = connection.lock ()) { - if (connection->node->config.logging.network_node_id_handshake_logging ()) + if (ec) { - connection->node->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % connection->remote_endpoint % ec.message ())); + if (connection_l->node->config.logging.network_node_id_handshake_logging ()) + { + connection_l->node->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % connection_l->remote_endpoint % ec.message ())); + } + // Stop invalid handshake + connection_l->stop (); + } + else + { + connection_l->node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out); + connection_l->finish_request (); } - // Stop invalid handshake - connection->stop (); - } - else - { - connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out); - connection->finish_request (); } }); // clang-format on diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index 10dc5d47..62b0ec60 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -223,9 +223,8 @@ void nano::socket::close () // This must be called from a strand or the destructor void nano::socket::close_internal () { - if (!closed) + if (!closed.exchange (true)) { - closed = true; io_timeout = boost::none; boost::system::error_code ec; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 121c056a..b0af4483 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -2,7 +2,7 @@ #include #include -nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::shared_ptr socket_a) : +nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::weak_ptr socket_a) : channel (node_a), socket (socket_a) { @@ -12,22 +12,29 @@ nano::transport::channel_tcp::~channel_tcp () { nano::lock_guard lk (channel_mutex); // Close socket. Exception: socket is used by bootstrap_server - if (socket && !server) + if (auto socket_l = socket.lock ()) { - socket->close (); - } - // Remove response server - if (response_server != nullptr) - { - response_server->stop (); - response_server = nullptr; + if (!server) + { + socket_l->close (); + } + // Remove response server + if (auto response_server_l = response_server.lock ()) + { + response_server_l->stop (); + } } } size_t nano::transport::channel_tcp::hash_code () const { std::hash<::nano::tcp_endpoint> hash; - return hash (socket->remote_endpoint ()); + if (auto socket_l = socket.lock ()) + { + return hash (socket_l->remote_endpoint ()); + } + + return 0; } bool nano::transport::channel_tcp::operator== (nano::transport::channel const & other_a) const @@ -43,7 +50,10 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const & void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, nano::stat::detail detail_a, std::function const & callback_a) { - socket->async_write (buffer_a, tcp_callback (detail_a, socket->remote_endpoint (), callback_a)); + if (auto socket_l = socket.lock ()) + { + socket_l->async_write (buffer_a, tcp_callback (detail_a, socket_l->remote_endpoint (), callback_a)); + } } std::function nano::transport::channel_tcp::callback (nano::stat::detail detail_a, std::function const & callback_a) const @@ -77,7 +87,11 @@ std::function nano::transport:: std::string nano::transport::channel_tcp::to_string () const { - return boost::str (boost::format ("%1%") % socket->remote_endpoint ()); + if (auto socket_l = socket.lock ()) + { + return boost::str (boost::format ("%1%") % socket_l->remote_endpoint ()); + } + return ""; } nano::transport::tcp_channels::tcp_channels (nano::node & node_a) : @@ -85,7 +99,7 @@ node (node_a) { } -bool nano::transport::tcp_channels::insert (std::shared_ptr channel_a) +bool nano::transport::tcp_channels::insert (std::shared_ptr channel_a, std::shared_ptr socket_a, std::shared_ptr bootstrap_server_a) { auto endpoint (channel_a->get_tcp_endpoint ()); assert (endpoint.address ().is_v6 ()); @@ -102,7 +116,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr ().erase (node_id); } - channels.get ().insert ({ channel_a }); + channels.get ().insert ({ channel_a, socket_a, bootstrap_server_a }); error = false; lock.unlock (); node.network.channel_observer (channel_a); @@ -271,7 +285,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa // Don't insert temporary channels for response_server if (type_a == nano::bootstrap_server_type::realtime) { - insert (temporary_channel); + insert (temporary_channel, socket_a, nullptr); } node.network.process_message (message_a, temporary_channel); } @@ -314,18 +328,18 @@ void nano::transport::tcp_channels::stop () // Close all TCP sockets for (auto i (channels.begin ()), j (channels.end ()); i != j; ++i) { - if (i->channel->socket != nullptr) + if (i->socket) { - i->channel->socket->close (); + i->socket->close (); } // Remove response server - if (i->channel->response_server != nullptr) + if (i->response_server) { - i->channel->response_server->stop (); - i->channel->response_server = nullptr; + i->response_server->stop (); } } channels.clear (); + node_id_handshake_sockets.clear (); } bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a) @@ -356,15 +370,18 @@ std::unique_ptr nano::transport::tcp_channels::col { size_t channels_count = 0; size_t attemps_count = 0; + size_t node_id_handshake_sockets_count = 0; { nano::lock_guard guard (mutex); channels_count = channels.size (); attemps_count = attempts.size (); + node_id_handshake_sockets_count = node_id_handshake_sockets.size (); } auto composite = std::make_unique (name); composite->add_component (std::make_unique (seq_con_info{ "channels", channels_count, sizeof (decltype (channels)::value_type) })); composite->add_component (std::make_unique (seq_con_info{ "attempts", attemps_count, sizeof (decltype (attempts)::value_type) })); + composite->add_component (std::make_unique (seq_con_info{ "node_id_handshake_sockets", node_id_handshake_sockets_count, sizeof (decltype (node_id_handshake_sockets)::value_type) })); return composite; } @@ -377,6 +394,12 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point // Remove keepalive attempt tracking for attempts older than cutoff auto attempts_cutoff (attempts.get<1> ().lower_bound (cutoff_a)); attempts.get<1> ().erase (attempts.get<1> ().begin (), attempts_cutoff); + + // Cleanup any sockets which may still be existing from failed node id handshakes + node_id_handshake_sockets.erase (std::remove_if (node_id_handshake_sockets.begin (), node_id_handshake_sockets.end (), [this](auto socket) { + return channels.get ().find (socket->remote_endpoint ()) == channels.get ().end (); + }), + node_id_handshake_sockets.end ()); } void nano::transport::tcp_channels::ongoing_keepalive () @@ -455,6 +478,22 @@ void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_ } } +bool nano::transport::tcp_channels::node_id_handhake_sockets_empty () const +{ + nano::lock_guard guard (mutex); + return node_id_handshake_sockets.empty (); +} + +void nano::transport::tcp_channels::remove_node_id_handshake_socket (std::shared_ptr socket_a) +{ + std::weak_ptr node_w (node.shared ()); + if (auto node_l = node_w.lock ()) + { + nano::lock_guard guard (mutex); + node_id_handshake_sockets.erase (std::remove (node_id_handshake_sockets.begin (), node_id_handshake_sockets.end (), socket_a), node_id_handshake_sockets.end ()); + } +} + void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a, std::function)> const & callback_a) { if (node.flags.disable_tcp_realtime) @@ -463,10 +502,11 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a return; } auto socket (std::make_shared (node.shared_from_this (), boost::none, nano::socket::concurrency::multi_writer)); - auto channel (std::make_shared (node, socket)); + std::weak_ptr socket_w (socket); + auto channel (std::make_shared (node, socket_w)); std::weak_ptr node_w (node.shared ()); - channel->socket->async_connect (nano::transport::map_endpoint_to_tcp (endpoint_a), - [node_w, channel, endpoint_a, callback_a](boost::system::error_code const & ec) { + socket->async_connect (nano::transport::map_endpoint_to_tcp (endpoint_a), + [node_w, channel, socket, endpoint_a, callback_a](boost::system::error_code const & ec) { if (auto node_l = node_w.lock ()) { if (!ec && channel) @@ -481,6 +521,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a } std::shared_ptr> receive_buffer (std::make_shared> ()); receive_buffer->resize (256); + node_l->network.tcp_channels.node_id_handshake_sockets.push_back (socket); channel->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel, endpoint_a, receive_buffer, callback_a](boost::system::error_code const & ec, size_t size_a) { if (auto node_l = node_w.lock ()) { @@ -490,6 +531,10 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a } else { + if (auto socket_l = channel->socket.lock ()) + { + node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l); + } if (node_l->config.logging.network_node_id_handshake_logging ()) { node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ())); @@ -510,94 +555,115 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr channel_a, nano::endpoint const & endpoint_a, std::shared_ptr> receive_buffer_a, std::function)> const & callback_a) { std::weak_ptr node_w (node.shared ()); - channel_a->socket->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) { - if (auto node_l = node_w.lock ()) - { - if (!ec && channel_a) + if (auto socket_l = channel_a->socket.lock ()) + { + // clang-format off + auto cleanup_and_udp_fallback = [socket_w = channel_a->socket, node_w](nano::endpoint const & endpoint_a, std::function)> const & callback_a) { + if (auto node_l = node_w.lock ()) { - node_l->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - auto error (false); - nano::bufferstream stream (receive_buffer_a->data (), size_a); - nano::message_header header (error, stream); - if (!error && header.type == nano::message_type::node_id_handshake && header.version_using >= node_l->network_params.protocol.protocol_version_min) + node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + + if (auto socket_l = socket_w.lock ()) { - nano::node_id_handshake message (error, stream, header); - if (!error && message.response && message.query) + node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l); + } + } + }; + // clang-format on + + socket_l->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, callback_a, cleanup_and_udp_fallback](boost::system::error_code const & ec, size_t size_a) { + if (auto node_l = node_w.lock ()) + { + if (!ec && channel_a) + { + node_l->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); + auto error (false); + nano::bufferstream stream (receive_buffer_a->data (), size_a); + nano::message_header header (error, stream); + if (!error && header.type == nano::message_type::node_id_handshake && header.version_using >= node_l->network_params.protocol.protocol_version_min) { - channel_a->set_network_version (header.version_using); - auto node_id (message.response->first); - bool process (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub); - if (process) + nano::node_id_handshake message (error, stream, header); + if (!error && message.response && message.query) { - /* If node ID is known, don't establish new connection - Exception: temporary channels from bootstrap_server */ - auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id)); - if (existing_channel) + channel_a->set_network_version (header.version_using); + auto node_id (message.response->first); + bool process (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub); + if (process) { - process = existing_channel->server; + /* If node ID is known, don't establish new connection + Exception: temporary channels from bootstrap_server */ + auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id)); + if (existing_channel) + { + process = existing_channel->server; + } + } + if (process) + { + channel_a->set_node_id (node_id); + channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); + boost::optional> response (std::make_pair (node_l->node_id.pub, nano::sign_message (node_l->node_id.prv, node_l->node_id.pub, *message.query))); + nano::node_id_handshake response_message (boost::none, response); + auto bytes = response_message.to_shared_const_buffer (); + if (node_l->config.logging.network_node_id_handshake_logging ()) + { + node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ())); + } + channel_a->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel_a, endpoint_a, callback_a, cleanup_and_udp_fallback](boost::system::error_code const & ec, size_t size_a) { + if (auto node_l = node_w.lock ()) + { + if (!ec && channel_a) + { + // Insert new node ID connection + if (auto socket_l = channel_a->socket.lock ()) + { + channel_a->set_last_packet_sent (std::chrono::steady_clock::now ()); + auto response_server = std::make_shared (socket_l, node_l); + node_l->network.tcp_channels.insert (channel_a, socket_l, response_server); + if (callback_a) + { + callback_a (channel_a); + } + // Listen for possible responses + response_server->type = nano::bootstrap_server_type::realtime_response_server; + response_server->remote_node_id = channel_a->get_node_id (); + response_server->receive (); + node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l); + } + } + else + { + if (node_l->config.logging.network_node_id_handshake_logging ()) + { + node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ())); + } + cleanup_and_udp_fallback (endpoint_a, callback_a); + } + } + }); } } - if (process) + else { - channel_a->set_node_id (node_id); - channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); - boost::optional> response (std::make_pair (node_l->node_id.pub, nano::sign_message (node_l->node_id.prv, node_l->node_id.pub, *message.query))); - nano::node_id_handshake response_message (boost::none, response); - auto bytes = response_message.to_shared_const_buffer (); - if (node_l->config.logging.network_node_id_handshake_logging ()) - { - node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ())); - } - channel_a->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel_a, endpoint_a, callback_a](boost::system::error_code const & ec, size_t size_a) { - if (auto node_l = node_w.lock ()) - { - if (!ec && channel_a) - { - // Insert new node ID connection - channel_a->set_last_packet_sent (std::chrono::steady_clock::now ()); - node_l->network.tcp_channels.insert (channel_a); - if (callback_a) - { - callback_a (channel_a); - } - // Listen for possible responses - channel_a->response_server = std::make_shared (channel_a->socket, node_l); - channel_a->response_server->type = nano::bootstrap_server_type::realtime_response_server; - channel_a->response_server->remote_node_id = channel_a->get_node_id (); - channel_a->response_server->receive (); - } - else - { - if (node_l->config.logging.network_node_id_handshake_logging ()) - { - node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ())); - } - node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); - } - } - }); + cleanup_and_udp_fallback (endpoint_a, callback_a); } } else { - node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + cleanup_and_udp_fallback (endpoint_a, callback_a); } } else { - node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + if (node_l->config.logging.network_node_id_handshake_logging ()) + { + node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%: %2%") % endpoint_a % ec.message ())); + } + cleanup_and_udp_fallback (endpoint_a, callback_a); } } - else - { - if (node_l->config.logging.network_node_id_handshake_logging ()) - { - node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%: %2%") % endpoint_a % ec.message ())); - } - node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); - } - } - }); + }); + } } void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a, std::function)> const & callback_a) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 2202caca..03d8a089 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -23,7 +23,7 @@ namespace transport friend class nano::transport::tcp_channels; public: - channel_tcp (nano::node &, std::shared_ptr); + channel_tcp (nano::node &, std::weak_ptr); ~channel_tcp (); size_t hash_code () const override; bool operator== (nano::transport::channel const &) const override; @@ -33,18 +33,18 @@ namespace transport std::string to_string () const override; bool operator== (nano::transport::channel_tcp const & other_a) const { - return &node == &other_a.node && socket == other_a.socket; + return &node == &other_a.node && socket.lock () == other_a.socket.lock (); } - std::shared_ptr socket; - std::shared_ptr response_server; + std::weak_ptr socket; + std::weak_ptr response_server; bool server{ false }; nano::endpoint get_endpoint () const override { nano::lock_guard lk (channel_mutex); - if (socket) + if (auto socket_l = socket.lock ()) { - return nano::transport::map_tcp_to_endpoint (socket->remote_endpoint ()); + return nano::transport::map_tcp_to_endpoint (socket_l->remote_endpoint ()); } else { @@ -55,9 +55,9 @@ namespace transport nano::tcp_endpoint get_tcp_endpoint () const override { nano::lock_guard lk (channel_mutex); - if (socket) + if (auto socket_l = socket.lock ()) { - return socket->remote_endpoint (); + return socket_l->remote_endpoint (); } else { @@ -76,7 +76,7 @@ namespace transport public: tcp_channels (nano::node &); - bool insert (std::shared_ptr); + bool insert (std::shared_ptr, std::shared_ptr, std::shared_ptr); void erase (nano::tcp_endpoint const &); size_t size () const; std::shared_ptr find_channel (nano::tcp_endpoint const &) const; @@ -104,6 +104,8 @@ namespace transport void start_tcp (nano::endpoint const &, std::function)> const & = nullptr); void start_tcp_receive_node_id (std::shared_ptr, nano::endpoint const &, std::shared_ptr>, std::function)> const &); void udp_fallback (nano::endpoint const &, std::function)> const &); + void remove_node_id_handshake_socket (std::shared_ptr socket_a); + bool node_id_handhake_sockets_empty () const; nano::node & node; private: @@ -129,6 +131,8 @@ namespace transport { public: std::shared_ptr channel; + std::shared_ptr socket; + std::shared_ptr response_server; nano::tcp_endpoint endpoint () const { return channel->get_tcp_endpoint (); @@ -175,6 +179,8 @@ namespace transport boost::multi_index::hashed_unique>, boost::multi_index::ordered_non_unique>>> attempts; + // This owns the sockets until the node_id_handshake has been completed. Needed to prevent self referencing callbacks, they are periodically removed if any are dangling. + std::vector> node_id_handshake_sockets; std::atomic stopped{ false }; }; } // namespace transport