diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 7c5dc568..66d46657 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1083,8 +1083,8 @@ TEST (network, purge_dead_channel) auto channel = channels.front (); ASSERT_TRUE (channel); - auto sockets = node1.tcp_listener.sockets (); - ASSERT_EQ (sockets.size (), 1); + std::deque> sockets; + ASSERT_TIMELY_EQ (5s, (sockets = node1.tcp_listener.all_sockets ()).size (), 1); auto socket = sockets.front (); ASSERT_TRUE (socket); @@ -1133,8 +1133,8 @@ TEST (network, purge_dead_channel_remote) auto channel = channels.front (); ASSERT_TRUE (channel); - auto sockets = node1.tcp_listener.sockets (); - ASSERT_EQ (sockets.size (), 1); + std::deque> sockets; + ASSERT_TIMELY_EQ (5s, (sockets = node1.tcp_listener.all_sockets ()).size (), 1); auto socket = sockets.front (); ASSERT_TRUE (socket); diff --git a/nano/core_test/tcp_listener.cpp b/nano/core_test/tcp_listener.cpp index 787066f6..424c1990 100644 --- a/nano/core_test/tcp_listener.cpp +++ b/nano/core_test/tcp_listener.cpp @@ -25,14 +25,13 @@ TEST (tcp_listener, max_connections) node_config.tcp.max_inbound_connections = 2; auto node = system.add_node (node_config, node_flags); - // client side connection tracking std::atomic connection_attempts = 0; auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) { ASSERT_EQ (ec_a.value (), 0); ++connection_attempts; }; - // start 3 clients, 2 will persist but 1 will be dropped + // Start 3 clients, 2 should connect successfully auto client1 = std::make_shared (*node); client1->async_connect (node->network.endpoint (), connect_handler); @@ -45,46 +44,8 @@ TEST (tcp_listener, max_connections) ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 2); ASSERT_ALWAYS_EQ (1s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 2); ASSERT_TIMELY_EQ (5s, connection_attempts, 3); - - // create space for one socket and fill the connections table again - { - auto sockets1 = node->tcp_listener.sockets (); - ASSERT_EQ (sockets1.size (), 2); - sockets1[0]->close (); - } - ASSERT_TIMELY_EQ (10s, node->tcp_listener.sockets ().size (), 1); - - auto client4 = std::make_shared (*node); - client4->async_connect (node->network.endpoint (), connect_handler); - - auto client5 = std::make_shared (*node); - client5->async_connect (node->network.endpoint (), connect_handler); - - ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 3); - ASSERT_ALWAYS_EQ (1s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 3); - ASSERT_TIMELY_EQ (5s, connection_attempts, 5); - - // close all existing sockets and fill the connections table again - { - auto sockets2 = node->tcp_listener.sockets (); - ASSERT_EQ (sockets2.size (), 2); - sockets2[0]->close (); - sockets2[1]->close (); - } - ASSERT_TIMELY_EQ (10s, node->tcp_listener.sockets ().size (), 0); - - auto client6 = std::make_shared (*node); - client6->async_connect (node->network.endpoint (), connect_handler); - - auto client7 = std::make_shared (*node); - client7->async_connect (node->network.endpoint (), connect_handler); - - auto client8 = std::make_shared (*node); - client8->async_connect (node->network.endpoint (), connect_handler); - - ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 5); - ASSERT_ALWAYS_EQ (1s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 5); - ASSERT_TIMELY_EQ (5s, connection_attempts, 8); // connections initiated by the client + ASSERT_TIMELY_EQ (5s, node->tcp_listener.all_sockets ().size (), 2); + ASSERT_ALWAYS_EQ (1s, node->tcp_listener.all_sockets ().size (), 2); } TEST (tcp_listener, max_connections_per_ip) @@ -206,7 +167,7 @@ TEST (tcp_listener, max_peers_per_ip) ASSERT_TIMELY_EQ (5s, connection_attempts, max_ip_connections + 1); } -TEST (tcp_listener, tcp_node_id_handshake) +TEST (tcp_listener, node_id_handshake) { nano::test::system system (1); auto socket (std::make_shared (*system.nodes[0])); @@ -240,33 +201,26 @@ TEST (tcp_listener, tcp_node_id_handshake) ASSERT_TIMELY (5s, done); } -// Test disabled because it's failing intermittently. -// PR in which it got disabled: https://github.com/nanocurrency/nano-node/pull/3611 -// Issue for investigating it: https://github.com/nanocurrency/nano-node/issues/3615 -TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty) +TEST (tcp_listener, timeout_empty) { - nano::test::system system (1); - auto node0 (system.nodes[0]); + nano::test::system system; + nano::node_config config; + config.tcp.handshake_timeout = 2s; + auto node0 = system.add_node (config); auto socket (std::make_shared (*node0)); - std::atomic connected (false); - socket->async_connect (node0->tcp_listener.endpoint (), [&connected] (boost::system::error_code const & ec) { + socket->async_connect (node0->tcp_listener.endpoint (), [] (boost::system::error_code const & ec) { ASSERT_FALSE (ec); - connected = true; }); - ASSERT_TIMELY (5s, connected); - bool disconnected (false); - system.deadline_set (std::chrono::seconds (6)); - while (!disconnected) - { - disconnected = node0->tcp_listener.connection_count () == 0; - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY_EQ (5s, node0->tcp_listener.connection_count (), 1); + ASSERT_TIMELY_EQ (10s, node0->tcp_listener.connection_count (), 0); } -TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) +TEST (tcp_listener, timeout_node_id_handshake) { - nano::test::system system (1); - auto node0 (system.nodes[0]); + nano::test::system system; + nano::node_config config; + config.tcp.handshake_timeout = 2s; + auto node0 = system.add_node (config); auto socket (std::make_shared (*node0)); auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->tcp_listener.endpoint ()))); ASSERT_TRUE (cookie); @@ -280,12 +234,6 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) }); }); ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0); - ASSERT_EQ (node0->tcp_listener.connection_count (), 1); - bool disconnected (false); - system.deadline_set (std::chrono::seconds (20)); - while (!disconnected) - { - disconnected = node0->tcp_listener.connection_count () == 0; - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY_EQ (5s, node0->tcp_listener.connection_count (), 1); + ASSERT_TIMELY_EQ (10s, node0->tcp_listener.connection_count (), 0); } \ No newline at end of file diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 62a68f3f..f838326b 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -383,6 +383,7 @@ enum class detail connect_rejected, connect_success, attempt_timeout, + handshake_timeout, not_a_peer, // tcp_channel diff --git a/nano/lib/utility.hpp b/nano/lib/utility.hpp index f86e71a2..4713650b 100644 --- a/nano/lib/utility.hpp +++ b/nano/lib/utility.hpp @@ -6,6 +6,7 @@ #include +#include #include #include #include @@ -65,6 +66,28 @@ size_t erase_if (Container & container, Pred pred) return result; } +/** + * Erase elements from container when predicate returns true and return erased elements as a std::deque + */ +template +std::deque erase_if_and_collect (Container & container, Pred pred) +{ + std::deque removed_elements; + for (auto it = container.begin (); it != container.end ();) + { + if (pred (*it)) + { + removed_elements.push_back (*it); + it = container.erase (it); + } + else + { + ++it; + } + } + return removed_elements; +} + /** Safe narrowing cast which silences warnings and asserts on data loss in debug builds. This is optimized away. */ template constexpr TARGET_TYPE narrow_cast (SOURCE_TYPE const & val) diff --git a/nano/node/transport/tcp_channels.cpp b/nano/node/transport/tcp_channels.cpp index 176d2e02..5335e535 100644 --- a/nano/node/transport/tcp_channels.cpp +++ b/nano/node/transport/tcp_channels.cpp @@ -1,6 +1,8 @@ #include #include +#include + /* * tcp_channels */ @@ -308,7 +310,7 @@ bool nano::transport::tcp_channels::track_reachout (nano::endpoint const & endpo void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point cutoff_deadline) { - nano::lock_guard lock{ mutex }; + auto channels_l = all_channels (); auto should_close = [this, cutoff_deadline] (auto const & channel) { // Remove channels that haven't successfully sent a message within the cutoff time @@ -332,27 +334,35 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point return false; }; - for (auto const & entry : channels) + // Close stale channels without holding the mutex + for (auto const & channel : channels_l) { - if (should_close (entry.channel)) + if (should_close (channel)) { - entry.channel->close (); + channel->close (); } } - erase_if (channels, [this] (auto const & entry) { - if (!entry.channel->alive ()) - { - node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ()); - entry.channel->close (); - return true; // Erase - } - return false; - }); + nano::unique_lock lock{ mutex }; // Remove keepalive attempt tracking for attempts older than cutoff auto attempts_cutoff (attempts.get ().lower_bound (cutoff_deadline)); attempts.get ().erase (attempts.get ().begin (), attempts_cutoff); + + // Erase dead channels from list, but close them outside of the lock + auto erased_connections = erase_if_and_collect (channels, [this] (auto const & entry) { + return !entry.channel->alive (); + }); + + lock.unlock (); + + for (auto const & connection : erased_connections) + { + node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::erase_dead); + node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", connection.channel->to_string ()); + + connection.channel->close (); + } } void nano::transport::tcp_channels::keepalive () @@ -437,6 +447,27 @@ bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint) return node.tcp_listener.connect (endpoint.address (), endpoint.port ()); } +auto nano::transport::tcp_channels::all_sockets () const -> std::deque> +{ + nano::lock_guard lock{ mutex }; + auto r = channels | std::views::transform ([] (auto const & entry) { return entry.socket; }); + return { r.begin (), r.end () }; +} + +auto nano::transport::tcp_channels::all_servers () const -> std::deque> +{ + nano::lock_guard lock{ mutex }; + auto r = channels | std::views::transform ([] (auto const & entry) { return entry.server; }); + return { r.begin (), r.end () }; +} + +auto nano::transport::tcp_channels::all_channels () const -> std::deque> +{ + nano::lock_guard lock{ mutex }; + auto r = channels | std::views::transform ([] (auto const & entry) { return entry.channel; }); + return { r.begin (), r.end () }; +} + nano::container_info nano::transport::tcp_channels::container_info () const { nano::lock_guard guard{ mutex }; diff --git a/nano/node/transport/tcp_channels.hpp b/nano/node/transport/tcp_channels.hpp index f27453e3..42a12738 100644 --- a/nano/node/transport/tcp_channels.hpp +++ b/nano/node/transport/tcp_channels.hpp @@ -62,6 +62,10 @@ public: // Connection start bool start_tcp (nano::endpoint const &); + std::deque> all_sockets () const; + std::deque> all_servers () const; + std::deque> all_channels () const; + nano::container_info container_info () const; private: // Dependencies diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 19ef706d..388be082 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -153,14 +153,8 @@ void nano::transport::tcp_listener::stop () for (auto & connection : connections_l) { - if (auto socket = connection.socket.lock ()) - { - socket->close (); - } - if (auto server = connection.server.lock ()) - { - server->stop (); - } + connection.socket->close (); + connection.server->stop (); } logger.debug (nano::log::type::tcp_listener, "Stopped"); @@ -173,53 +167,75 @@ void nano::transport::tcp_listener::run_cleanup () { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup); - cleanup (); timeout (); + purge (lock); + debug_assert (!lock.owns_lock ()); + lock.lock (); condition.wait_for (lock, 1s, [this] () { return stopped.load (); }); } } -void nano::transport::tcp_listener::cleanup () +void nano::transport::tcp_listener::purge (nano::unique_lock & lock) { + debug_assert (lock.owns_lock ()); debug_assert (!mutex.try_lock ()); - // Erase dead connections - erase_if (connections, [this] (auto const & connection) { - if (connection.socket.expired () && connection.server.expired ()) - { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead); - logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint); - return true; - } - else - { - return false; - } - }); - // Erase completed attempts erase_if (attempts, [this] (auto const & attempt) { return attempt.task.ready (); }); + + // Erase dead connections + auto erased_connections = erase_if_and_collect (connections, [this] (auto const & connection) { + return !connection.socket->alive (); + }); + + lock.unlock (); + + for (auto const & connection : erased_connections) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead); + logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint); + + connection.socket->close (); + connection.server->stop (); + } } void nano::transport::tcp_listener::timeout () { debug_assert (!mutex.try_lock ()); - auto const cutoff = std::chrono::steady_clock::now () - config.connect_timeout; + auto const now = std::chrono::steady_clock::now (); + auto const connect_cutoff = now - config.connect_timeout; + auto const handshake_cutoff = now - config.handshake_timeout; // Cancel timed out attempts for (auto & attempt : attempts) { - if (!attempt.task.ready () && attempt.start < cutoff) + if (!attempt.task.ready () && attempt.start < connect_cutoff) { - attempt.task.cancel (); - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout); logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)", - attempt.endpoint, nano::log::seconds_delta (attempt.start)); + attempt.endpoint, + nano::log::seconds_delta (attempt.start)); + + attempt.task.cancel (); // Cancel is non-blocking and will return immediately, safe to call under lock + } + } + + // Cancel too long handshakes + for (auto & connection : connections) + { + if (connection.socket->type () == nano::transport::socket_type::undefined && connection.socket->get_time_connected () < handshake_cutoff) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::handshake_timeout); + logger.debug (nano::log::type::tcp_listener, "Handshake timed out: {} (connected {}s ago)", + connection.endpoint, + nano::log::seconds_delta (connection.socket->get_time_connected ())); + + connection.socket->close (); // Schedule socket close, this is non-blocking, safe to call under lock } } } @@ -390,7 +406,7 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket if (auto result = check_limits (remote_endpoint.address (), type); result != accept_result::accepted) { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_rejected, to_stat_dir (type)); - logger.debug (nano::log::type::tcp_listener, "Rejected connection from: {} reason: {} ({})", + logger.debug (nano::log::type::tcp_listener, "Rejected connection: {} reason: {} ({})", remote_endpoint, to_string (result), to_string (type)); @@ -431,7 +447,7 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket return { accept_result::accepted, socket, server }; } -auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) -> accept_result +auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) const -> accept_result { debug_assert (!mutex.try_lock ()); @@ -440,8 +456,6 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, return accept_result::rejected; } - cleanup (); - if (node.network.excluded_peers.check (ip)) // true => error { stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded, to_stat_dir (type)); @@ -526,11 +540,7 @@ size_t nano::transport::tcp_listener::realtime_count () const nano::lock_guard lock{ mutex }; return std::count_if (connections.begin (), connections.end (), [] (auto const & connection) { - if (auto socket = connection.socket.lock ()) - { - return socket->is_realtime_connection (); - } - return false; + return connection.socket->type () == nano::transport::socket_type::realtime; }); } @@ -539,11 +549,7 @@ size_t nano::transport::tcp_listener::bootstrap_count () const nano::lock_guard lock{ mutex }; return std::count_if (connections.begin (), connections.end (), [] (auto const & connection) { - if (auto socket = connection.socket.lock ()) - { - return socket->is_bootstrap_connection (); - } - return false; + return connection.socket->type () == nano::transport::socket_type::bootstrap; }); } @@ -585,21 +591,17 @@ asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const return { asio::ip::address_v6::loopback (), local.port () }; } -auto nano::transport::tcp_listener::sockets () const -> std::vector> +auto nano::transport::tcp_listener::all_sockets () const -> std::deque> { nano::lock_guard lock{ mutex }; - auto r = connections - | std::views::transform ([] (auto const & connection) { return connection.socket.lock (); }) - | std::views::filter ([] (auto const & socket) { return socket != nullptr; }); + auto r = connections | std::views::transform ([] (auto const & connection) { return connection.socket; }); return { r.begin (), r.end () }; } -auto nano::transport::tcp_listener::servers () const -> std::vector> +auto nano::transport::tcp_listener::all_servers () const -> std::deque> { nano::lock_guard lock{ mutex }; - auto r = connections - | std::views::transform ([] (auto const & connection) { return connection.server.lock (); }) - | std::views::filter ([] (auto const & server) { return server != nullptr; }); + auto r = connections | std::views::transform ([] (auto const & connection) { return connection.server; }); return { r.begin (), r.end () }; } diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index ee485b5e..85854a80 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -71,8 +71,8 @@ public: size_t realtime_count () const; size_t bootstrap_count () const; - std::vector> sockets () const; - std::vector> servers () const; + std::deque> all_sockets () const; + std::deque> all_servers () const; nano::container_info container_info () const; @@ -92,7 +92,7 @@ private: asio::awaitable wait_available_slots () const; void run_cleanup (); - void cleanup (); + void purge (nano::unique_lock &); void timeout (); asio::awaitable connect_impl (asio::ip::tcp::endpoint); @@ -106,7 +106,7 @@ private: }; accept_return accept_one (asio::ip::tcp::socket, connection_type); - accept_result check_limits (asio::ip::address const & ip, connection_type); + accept_result check_limits (asio::ip::address const & ip, connection_type) const; asio::awaitable accept_socket (); size_t count_per_type (connection_type) const; @@ -119,8 +119,8 @@ private: { connection_type type; asio::ip::tcp::endpoint endpoint; - std::weak_ptr socket; - std::weak_ptr server; + std::shared_ptr socket; + std::shared_ptr server; asio::ip::address address () const { diff --git a/nano/node/transport/tcp_socket.cpp b/nano/node/transport/tcp_socket.cpp index d5686c68..4fe42e24 100644 --- a/nano/node/transport/tcp_socket.cpp +++ b/nano/node/transport/tcp_socket.cpp @@ -36,6 +36,7 @@ nano::transport::tcp_socket::tcp_socket (nano::node & node_a, boost::asio::ip::t default_timeout{ node_a.config.tcp_io_timeout }, silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time } { + time_connected = std::chrono::steady_clock::now (); } nano::transport::tcp_socket::~tcp_socket () @@ -77,6 +78,7 @@ void nano::transport::tcp_socket::async_connect (nano::tcp_endpoint const & endp } else { + this_l->time_connected = std::chrono::steady_clock::now (); this_l->set_last_completion (); { // Best effort attempt to get endpoint address diff --git a/nano/node/transport/tcp_socket.hpp b/nano/node/transport/tcp_socket.hpp index 6e1ba40d..ebb7fe78 100644 --- a/nano/node/transport/tcp_socket.hpp +++ b/nano/node/transport/tcp_socket.hpp @@ -140,6 +140,10 @@ public: { return !is_closed (); } + std::chrono::steady_clock::time_point get_time_connected () const + { + return time_connected.load (); + } private: size_t const queue_size; @@ -188,6 +192,8 @@ protected: /** Updated only from strand, but stored as atomic so it can be read from outside */ std::atomic write_in_progress{ false }; + std::atomic time_connected; + void close_internal (); void write_queued_messages (); void set_default_timeout ();