From eb8c1aaff9eca78afd649744c2676c5fc1a98bc0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20W=C3=B3jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 10 Nov 2022 18:06:37 +0100 Subject: [PATCH] Improved removal of dead network channels (#3993) --- nano/core_test/network.cpp | 146 ++++++++++++++++++++++++++++++ nano/core_test/node.cpp | 22 ++--- nano/lib/config.hpp | 4 + nano/node/network.cpp | 6 +- nano/node/node_observers.hpp | 3 + nano/node/nodeconfig.hpp | 2 - nano/node/repcrawler.cpp | 19 ++-- nano/node/socket.cpp | 27 ++++-- nano/node/socket.hpp | 10 +- nano/node/transport/fake.hpp | 13 ++- nano/node/transport/tcp.cpp | 23 ++++- nano/node/transport/tcp.hpp | 9 ++ nano/node/transport/transport.hpp | 4 + nano/test_common/testutil.hpp | 10 ++ 14 files changed, 257 insertions(+), 41 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 5cfaf03f..ff1ad17b 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1,4 +1,6 @@ +#include #include +#include #include #include #include @@ -1412,3 +1414,147 @@ TEST (network, fill_keepalive_self) system.nodes[0]->network.fill_keepalive_self (target); ASSERT_TRUE (target[2].port () == system.nodes[1]->network.port); } + +/* + * Tests that channel and channel container removes channels with dead local sockets + */ +TEST (network, purge_dead_channel_outgoing) +{ + nano::test::system system{}; + + nano::node_flags flags; + // Disable non realtime sockets + flags.disable_bootstrap_bulk_push_client = true; + flags.disable_bootstrap_bulk_pull_server = true; + flags.disable_bootstrap_listener = true; + flags.disable_lazy_bootstrap = true; + flags.disable_legacy_bootstrap = true; + flags.disable_wallet_bootstrap = true; + + auto & node1 = *system.add_node (flags); + + // We expect one incoming and one outgoing connection + std::shared_ptr outgoing; + std::shared_ptr incoming; + + std::atomic connected_count{ 0 }; + node1.observers.socket_connected.add ([&] (nano::socket & socket) { + connected_count++; + outgoing = socket.shared_from_this (); + + std::cout << "connected: " << socket.remote_endpoint () << std::endl; + }); + + std::atomic accepted_count{ 0 }; + node1.observers.socket_accepted.add ([&] (nano::socket & socket) { + accepted_count++; + incoming = socket.shared_from_this (); + + std::cout << "accepted: " << socket.remote_endpoint () << std::endl; + }); + + auto & node2 = *system.add_node (flags); + + ASSERT_TIMELY_EQ (5s, connected_count, 1); + ASSERT_ALWAYS_EQ (1s, connected_count, 1); + + ASSERT_TIMELY_EQ (5s, accepted_count, 1); + ASSERT_ALWAYS_EQ (1s, accepted_count, 1); + + ASSERT_EQ (node1.network.size (), 1); + ASSERT_ALWAYS_EQ (1s, node1.network.size (), 1); + + // Store reference to the only channel + auto channels = node1.network.list (); + ASSERT_EQ (channels.size (), 1); + auto channel = channels.front (); + ASSERT_TRUE (channel); + + // When socket is dead ensure channel knows about that + ASSERT_TRUE (channel->alive ()); + outgoing->close (); + ASSERT_TIMELY (5s, !channel->alive ()); + + // Shortly after that a new channel should be established + ASSERT_TIMELY_EQ (5s, connected_count, 2); + ASSERT_ALWAYS_EQ (1s, connected_count, 2); + + // Check that a new channel is healthy + auto channels2 = node1.network.list (); + ASSERT_EQ (channels2.size (), 1); + auto channel2 = channels2.front (); + ASSERT_TRUE (channel2); + ASSERT_TRUE (channel2->alive ()); +} + +/* + * Tests that channel and channel container removes channels with dead remote sockets + */ +TEST (network, purge_dead_channel_incoming) +{ + nano::test::system system{}; + + nano::node_flags flags; + // Disable non realtime sockets + flags.disable_bootstrap_bulk_push_client = true; + flags.disable_bootstrap_bulk_pull_server = true; + flags.disable_bootstrap_listener = true; + flags.disable_lazy_bootstrap = true; + flags.disable_legacy_bootstrap = true; + flags.disable_wallet_bootstrap = true; + + auto & node1 = *system.add_node (flags); + + // We expect one incoming and one outgoing connection + std::shared_ptr outgoing; + std::shared_ptr incoming; + + std::atomic connected_count{ 0 }; + node1.observers.socket_connected.add ([&] (nano::socket & socket) { + connected_count++; + outgoing = socket.shared_from_this (); + + std::cout << "connected: " << socket.remote_endpoint () << std::endl; + }); + + std::atomic accepted_count{ 0 }; + node1.observers.socket_accepted.add ([&] (nano::socket & socket) { + accepted_count++; + incoming = socket.shared_from_this (); + + std::cout << "accepted: " << socket.remote_endpoint () << std::endl; + }); + + auto & node2 = *system.add_node (flags); + + ASSERT_TIMELY_EQ (5s, connected_count, 1); + ASSERT_ALWAYS_EQ (1s, connected_count, 1); + + ASSERT_TIMELY_EQ (5s, accepted_count, 1); + ASSERT_ALWAYS_EQ (1s, accepted_count, 1); + + ASSERT_EQ (node2.network.size (), 1); + ASSERT_ALWAYS_EQ (1s, node2.network.size (), 1); + + // Store reference to the only channel + auto channels = node2.network.list (); + ASSERT_EQ (channels.size (), 1); + auto channel = channels.front (); + ASSERT_TRUE (channel); + + // When remote socket is dead ensure channel knows about that + ASSERT_TRUE (channel->alive ()); + incoming->close (); + ASSERT_TIMELY (5s, !channel->alive ()); + + // Shortly after that a new channel should be established + ASSERT_TIMELY_EQ (5s, accepted_count, 2); + ASSERT_ALWAYS_EQ (1s, accepted_count, 2); + + // Check that a new channel is healthy + auto channels2 = node2.network.list (); + ASSERT_EQ (channels2.size (), 1); + auto channel2 = channels2.front (); + ASSERT_TRUE (channel2); + ASSERT_TRUE (channel2->alive ()); +} \ No newline at end of file diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 1e65fa2b..95833fd4 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1851,7 +1851,7 @@ TEST (node, rep_remove) ASSERT_EQ (*channel_rep1, reps[0].channel_ref ()); // When rep1 disconnects then rep1 should not be found anymore - channel_rep1->disconnect (); + channel_rep1->close (); ASSERT_TIMELY (5s, searching_node.rep_crawler.representative_count () == 0); // Add working node for genesis representative @@ -1884,7 +1884,7 @@ TEST (node, rep_remove) // Now only genesisRep should be found: reps = searching_node.rep_crawler.representatives (1); ASSERT_EQ (nano::dev::genesis_key.pub, reps[0].account); - ASSERT_EQ (1, searching_node.network.size ()); + ASSERT_TIMELY_EQ (5s, searching_node.network.size (), 1); auto list (searching_node.network.list (1)); ASSERT_EQ (node_genesis_rep->network.endpoint (), list[0]->get_endpoint ()); } @@ -3202,19 +3202,17 @@ TEST (node, peers) auto list2 (node2->network.list (2)); ASSERT_EQ (node1->get_node_id (), list2[0]->get_node_id ()); ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ()); + + // Uncontactable peer should not be stored + ASSERT_TIMELY_EQ (5s, store.peer.count (store.tx_begin_read ()), 1); + ASSERT_TRUE (store.peer.exists (store.tx_begin_read (), endpoint_key)); + // Stop the peer node and check that it is removed from the store node1->stop (); - ASSERT_TIMELY (10s, node2->network.size () != 1); - - ASSERT_TRUE (node2->network.empty ()); - - // Uncontactable peer should not be stored - auto transaction (store.tx_begin_read ()); - ASSERT_EQ (store.peer.count (transaction), 1); - ASSERT_TRUE (store.peer.exists (transaction, endpoint_key)); - - node2->stop (); + // TODO: In `tcp_channels::store_all` we skip store operation when there are no peers present, + // so the best we can do here is check if network is empty + ASSERT_TIMELY (10s, node2->network.empty ()); } TEST (node, peer_cache_restart) diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index a8f42305..17f5d1f7 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -203,6 +203,7 @@ public: default_websocket_port (47000), request_interval_ms (500), cleanup_period (default_cleanup_period), + keepalive_period (std::chrono::seconds (15)), idle_timeout (default_cleanup_period * 2), silent_connection_tolerance_time (std::chrono::seconds (120)), syn_cookie_cutoff (std::chrono::seconds (5)), @@ -237,6 +238,7 @@ public: { request_interval_ms = 20; cleanup_period = std::chrono::seconds (1); + keepalive_period = std::chrono::seconds (1); idle_timeout = cleanup_period * 15; max_peers_per_ip = 20; max_peers_per_subnetwork = max_peers_per_ip * 4; @@ -267,6 +269,8 @@ public: { return cleanup_period * 5; } + /** How often to send keepalive messages */ + std::chrono::seconds keepalive_period; /** Default maximum idle time for a socket before it's automatically closed */ std::chrono::seconds idle_timeout; std::chrono::seconds silent_connection_tolerance_time; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 924a9155..831411da 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -624,7 +624,7 @@ std::deque> nano::network::list (std:: tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a); udp_channels.list (result, minimum_version_a); nano::random_pool_shuffle (result.begin (), result.end ()); - if (result.size () > count_a) + if (count_a > 0 && result.size () > count_a) { result.resize (count_a, nullptr); } @@ -769,7 +769,7 @@ void nano::network::ongoing_cleanup () { cleanup (std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff ()); std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.cleanup_period, [node_w] () { + node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [node_w] () { if (auto node_l = node_w.lock ()) { node_l->network.ongoing_cleanup (); @@ -794,7 +794,7 @@ void nano::network::ongoing_keepalive () flood_keepalive (0.75f); flood_keepalive_self (0.25f); std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.cleanup_period_half (), [node_w] () { + node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () { if (auto node_l = node_w.lock ()) { node_l->network.ongoing_keepalive (); diff --git a/nano/node/node_observers.hpp b/nano/node/node_observers.hpp index 3411b60a..f9b4ea23 100644 --- a/nano/node/node_observers.hpp +++ b/nano/node/node_observers.hpp @@ -22,6 +22,9 @@ public: nano::observer_set<> disconnect; nano::observer_set work_cancel; nano::observer_set telemetry; + + nano::observer_set socket_connected; + nano::observer_set socket_accepted; }; std::unique_ptr collect_container_info (node_observers & node_observers, std::string const & name); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index a1f1fa41..683a6207 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -88,8 +88,6 @@ public: /** Default maximum incoming TCP connections, including realtime network & bootstrap */ unsigned tcp_incoming_connections_max{ 2048 }; bool use_memory_pools{ true }; - static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60); - static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5; static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5); /** Default outbound traffic shaping is 10MB/s */ std::size_t bandwidth_limit{ 10 * 1024 * 1024 }; diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index e3597b84..61c16c7b 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -252,14 +252,17 @@ nano::uint128_t nano::rep_crawler::total_weight () const nano::uint128_t result (0); for (auto i (probable_reps.get ().begin ()), n (probable_reps.get ().end ()); i != n; ++i) { - auto weight (i->weight.number ()); - if (weight > 0) + if (i->channel->alive ()) { - result = result + weight; - } - else - { - break; + auto weight (i->weight.number ()); + if (weight > 0) + { + result = result + weight; + } + else + { + break; + } } } return result; @@ -292,7 +295,7 @@ void nano::rep_crawler::cleanup_reps () auto iterator (probable_reps.get ().begin ()); while (iterator != probable_reps.get ().end ()) { - if (iterator->channel->get_tcp_endpoint ().address () != boost::asio::ip::address_v6::any ()) + if (iterator->channel->alive ()) { channels.push_back (iterator->channel); ++iterator; diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index 242232bd..926ec52f 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -55,28 +55,35 @@ nano::socket::~socket () void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function callback_a) { + debug_assert (callback_a); debug_assert (endpoint_type () == endpoint_type_t::client); + checkup (); auto this_l (shared_from_this ()); set_default_timeout (); + this_l->tcp_socket.async_connect (endpoint_a, boost::asio::bind_executor (this_l->strand, [this_l, callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) { if (ec) { this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error, nano::stat::dir::in); + this_l->close (); } else { this_l->set_last_completion (); } this_l->remote = endpoint_a; + this_l->node.observers.socket_connected.notify (*this_l); callback (ec); })); } void nano::socket::async_read (std::shared_ptr> const & buffer_a, std::size_t size_a, std::function callback_a) { + debug_assert (callback_a); + if (size_a <= buffer_a->size ()) { auto this_l (shared_from_this ()); @@ -90,6 +97,7 @@ void nano::socket::async_read (std::shared_ptr> const & buf if (ec) { this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in); + this_l->close (); } else { @@ -147,6 +155,7 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std: if (ec) { this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); + this_l->close (); } else { @@ -192,9 +201,16 @@ void nano::socket::set_last_receive_time () void nano::socket::checkup () { std::weak_ptr this_w (shared_from_this ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (2), [this_w] () { + node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [this_w] () { if (auto this_l = this_w.lock ()) { + // If the socket is already dead, close just in case, and stop doing checkups + if (!this_l->alive ()) + { + this_l->close (); + return; + } + uint64_t now (nano::seconds_since_epoch ()); auto condition_to_disconnect{ false }; @@ -217,13 +233,7 @@ void nano::socket::checkup () { if (this_l->node.config.logging.network_timeout_logging ()) { - // The remote end may have closed the connection before this side timing out, in which case the remote address is no longer available. - boost::system::error_code ec_remote_l; - boost::asio::ip::tcp::endpoint remote_endpoint_l = this_l->tcp_socket.remote_endpoint (ec_remote_l); - if (!ec_remote_l) - { - this_l->node.logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % remote_endpoint_l)); - } + this_l->node.logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % this_l->remote)); } this_l->timed_out = true; this_l->close (); @@ -459,6 +469,7 @@ void nano::server_socket::on_connection (std::functionset_timeout (this_l->node.network_params.network.idle_timeout); this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection); + this_l->node.observers.socket_accepted.notify (*new_connection); if (cbk (new_connection, ec_a)) { this_l->on_connection (std::move (cbk)); diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index 41d27b9d..90cac64b 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -94,18 +94,22 @@ public: { return endpoint_type_m; } - bool is_realtime_connection () + bool is_realtime_connection () const { return type () == nano::socket::type_t::realtime || type () == nano::socket::type_t::realtime_response_server; } - bool is_bootstrap_connection () + bool is_bootstrap_connection () const { return type () == nano::socket::type_t::bootstrap; } - bool is_closed () + bool is_closed () const { return closed; } + bool alive () const + { + return !closed && tcp_socket.is_open (); + } protected: /** Holds the buffer and callback for queued writes */ diff --git a/nano/node/transport/fake.hpp b/nano/node/transport/fake.hpp index e3509fe5..976136a5 100644 --- a/nano/node/transport/fake.hpp +++ b/nano/node/transport/fake.hpp @@ -8,7 +8,7 @@ namespace transport { /** * Fake channel that connects to nothing and allows its attributes to be manipulated. Mostly useful for unit tests. - **/ + **/ namespace fake { class channel final : public nano::transport::channel @@ -50,13 +50,20 @@ namespace transport return nano::transport::transport_type::fake; } - void disconnect () + void close () { - endpoint = nano::endpoint (boost::asio::ip::address_v6::any (), 0); + closed = true; + } + + bool alive () const override + { + return !closed; } private: nano::endpoint endpoint; + + std::atomic closed{ false }; }; } // namespace fake } // namespace transport diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 9e5ee3c3..443d95cd 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -4,6 +4,10 @@ #include +/* + * channel_tcp + */ + nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::weak_ptr socket_a) : channel (node_a), socket (std::move (socket_a)) @@ -105,6 +109,10 @@ void nano::transport::channel_tcp::set_endpoint () } } +/* + * tcp_channels + */ + nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function const &)> sink) : node{ node }, sink{ std::move (sink) } @@ -443,8 +451,19 @@ std::unique_ptr nano::transport::tcp_channels::c void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_a) { nano::lock_guard lock (mutex); + + // Remove channels with dead underlying sockets + for (auto it = channels.begin (); it != channels.end (); ++it) + { + if (!it->socket->alive ()) + { + it = channels.erase (it); + } + } + auto disconnect_cutoff (channels.get ().lower_bound (cutoff_a)); channels.get ().erase (channels.get ().begin (), disconnect_cutoff); + // Remove keepalive attempt tracking for attempts older than cutoff auto attempts_cutoff (attempts.get ().lower_bound (cutoff_a)); attempts.get ().erase (attempts.get ().begin (), attempts_cutoff); @@ -461,7 +480,7 @@ void nano::transport::tcp_channels::ongoing_keepalive () nano::unique_lock lock (mutex); // Wake up channels std::vector> send_list; - auto keepalive_sent_cutoff (channels.get ().lower_bound (std::chrono::steady_clock::now () - node.network_params.network.cleanup_period)); + auto keepalive_sent_cutoff (channels.get ().lower_bound (std::chrono::steady_clock::now () - node.network_params.network.keepalive_period)); for (auto i (channels.get ().begin ()); i != keepalive_sent_cutoff; ++i) { send_list.push_back (i->channel); @@ -486,7 +505,7 @@ void nano::transport::tcp_channels::ongoing_keepalive () } } std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.cleanup_period_half (), [node_w] () { + node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () { if (auto node_l = node_w.lock ()) { if (!node_l->network.tcp_channels.stopped) diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index a0341067..3f258772 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -79,6 +79,15 @@ namespace transport return result; } + virtual bool alive () const override + { + if (auto socket_l = socket.lock ()) + { + return socket_l->alive (); + } + return false; + } + private: nano::tcp_endpoint endpoint{ boost::asio::ip::address_v6::any (), 0 }; }; diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 11ec8849..02b60df2 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -53,6 +53,10 @@ namespace transport { return false; } + virtual bool alive () const + { + return true; + } std::chrono::steady_clock::time_point get_last_bootstrap_attempt () const { diff --git a/nano/test_common/testutil.hpp b/nano/test_common/testutil.hpp index c005cac8..e9d4c794 100644 --- a/nano/test_common/testutil.hpp +++ b/nano/test_common/testutil.hpp @@ -97,6 +97,16 @@ ASSERT_TRUE (condition); \ } +/* + * Asserts that condition is always true during the specified amount of time + */ +#define ASSERT_ALWAYS_EQ(time, val1, val2) \ + system.deadline_set (time); \ + while (!system.poll ()) \ + { \ + ASSERT_EQ (val1, val2); \ + } + /* * Asserts that condition is never true during the specified amount of time */