diff --git a/nano/lib/utility.hpp b/nano/lib/utility.hpp index f86e71a2..4713650b 100644 --- a/nano/lib/utility.hpp +++ b/nano/lib/utility.hpp @@ -6,6 +6,7 @@ #include +#include #include #include #include @@ -65,6 +66,28 @@ size_t erase_if (Container & container, Pred pred) return result; } +/** + * Erase elements from container when predicate returns true and return erased elements as a std::deque + */ +template +std::deque erase_if_and_collect (Container & container, Pred pred) +{ + std::deque removed_elements; + for (auto it = container.begin (); it != container.end ();) + { + if (pred (*it)) + { + removed_elements.push_back (*it); + it = container.erase (it); + } + else + { + ++it; + } + } + return removed_elements; +} + /** Safe narrowing cast which silences warnings and asserts on data loss in debug builds. This is optimized away. */ template constexpr TARGET_TYPE narrow_cast (SOURCE_TYPE const & val) diff --git a/nano/node/transport/tcp_channels.cpp b/nano/node/transport/tcp_channels.cpp index 176d2e02..5335e535 100644 --- a/nano/node/transport/tcp_channels.cpp +++ b/nano/node/transport/tcp_channels.cpp @@ -1,6 +1,8 @@ #include #include +#include + /* * tcp_channels */ @@ -308,7 +310,7 @@ bool nano::transport::tcp_channels::track_reachout (nano::endpoint const & endpo void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point cutoff_deadline) { - nano::lock_guard lock{ mutex }; + auto channels_l = all_channels (); auto should_close = [this, cutoff_deadline] (auto const & channel) { // Remove channels that haven't successfully sent a message within the cutoff time @@ -332,27 +334,35 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point return false; }; - for (auto const & entry : channels) + // Close stale channels without holding the mutex + for (auto const & channel : channels_l) { - if (should_close (entry.channel)) + if (should_close (channel)) { - entry.channel->close (); + channel->close (); } } - erase_if (channels, [this] (auto const & entry) { - if (!entry.channel->alive ()) - { - node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ()); - entry.channel->close (); - return true; // Erase - } - return false; - }); + nano::unique_lock lock{ mutex }; // Remove keepalive attempt tracking for attempts older than cutoff auto attempts_cutoff (attempts.get ().lower_bound (cutoff_deadline)); attempts.get ().erase (attempts.get ().begin (), attempts_cutoff); + + // Erase dead channels from list, but close them outside of the lock + auto erased_connections = erase_if_and_collect (channels, [this] (auto const & entry) { + return !entry.channel->alive (); + }); + + lock.unlock (); + + for (auto const & connection : erased_connections) + { + node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::erase_dead); + node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", connection.channel->to_string ()); + + connection.channel->close (); + } } void nano::transport::tcp_channels::keepalive () @@ -437,6 +447,27 @@ bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint) return node.tcp_listener.connect (endpoint.address (), endpoint.port ()); } +auto nano::transport::tcp_channels::all_sockets () const -> std::deque> +{ + nano::lock_guard lock{ mutex }; + auto r = channels | std::views::transform ([] (auto const & entry) { return entry.socket; }); + return { r.begin (), r.end () }; +} + +auto nano::transport::tcp_channels::all_servers () const -> std::deque> +{ + nano::lock_guard lock{ mutex }; + auto r = channels | std::views::transform ([] (auto const & entry) { return entry.server; }); + return { r.begin (), r.end () }; +} + +auto nano::transport::tcp_channels::all_channels () const -> std::deque> +{ + nano::lock_guard lock{ mutex }; + auto r = channels | std::views::transform ([] (auto const & entry) { return entry.channel; }); + return { r.begin (), r.end () }; +} + nano::container_info nano::transport::tcp_channels::container_info () const { nano::lock_guard guard{ mutex }; diff --git a/nano/node/transport/tcp_channels.hpp b/nano/node/transport/tcp_channels.hpp index f27453e3..42a12738 100644 --- a/nano/node/transport/tcp_channels.hpp +++ b/nano/node/transport/tcp_channels.hpp @@ -62,6 +62,10 @@ public: // Connection start bool start_tcp (nano::endpoint const &); + std::deque> all_sockets () const; + std::deque> all_servers () const; + std::deque> all_channels () const; + nano::container_info container_info () const; private: // Dependencies diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 6e2baa65..3222c735 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -153,14 +153,8 @@ void nano::transport::tcp_listener::stop () for (auto & connection : connections_l) { - if (auto socket = connection.socket.lock ()) - { - socket->close (); - } - if (auto server = connection.server.lock ()) - { - server->stop (); - } + connection.socket->close (); + connection.server->stop (); } logger.debug (nano::log::type::tcp_listener, "Stopped"); @@ -173,35 +167,40 @@ void nano::transport::tcp_listener::run_cleanup () { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup); - cleanup (); timeout (); + purge (lock); + debug_assert (!lock.owns_lock ()); + lock.lock (); condition.wait_for (lock, 1s, [this] () { return stopped.load (); }); } } -void nano::transport::tcp_listener::cleanup () +void nano::transport::tcp_listener::purge (nano::unique_lock & lock) { + debug_assert (lock.owns_lock ()); debug_assert (!mutex.try_lock ()); - // Erase dead connections - erase_if (connections, [this] (auto const & connection) { - if (connection.socket.expired () && connection.server.expired ()) - { - stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead); - logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint); - return true; - } - else - { - return false; - } - }); - // Erase completed attempts erase_if (attempts, [this] (auto const & attempt) { return attempt.task.ready (); }); + + // Erase dead connections + auto erased_connections = erase_if_and_collect (connections, [this] (auto const & connection) { + return !connection.socket->alive (); + }); + + lock.unlock (); + + for (auto const & connection : erased_connections) + { + stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead); + logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint); + + connection.socket->close (); + connection.server->stop (); + } } void nano::transport::tcp_listener::timeout () @@ -215,7 +214,7 @@ void nano::transport::tcp_listener::timeout () { if (!attempt.task.ready () && attempt.start < cutoff) { - attempt.task.cancel (); + attempt.task.cancel (); // Cancel is non-blocking and will return immediately, safe to call under lock stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout); logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)", @@ -431,7 +430,7 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket return { accept_result::accepted, socket, server }; } -auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) -> accept_result +auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) const -> accept_result { debug_assert (!mutex.try_lock ()); @@ -440,8 +439,6 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, return accept_result::rejected; } - cleanup (); - if (node.network.excluded_peers.check (ip)) // true => error { stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded, to_stat_dir (type)); @@ -526,11 +523,7 @@ size_t nano::transport::tcp_listener::realtime_count () const nano::lock_guard lock{ mutex }; return std::count_if (connections.begin (), connections.end (), [] (auto const & connection) { - if (auto socket = connection.socket.lock ()) - { - return socket->is_realtime_connection (); - } - return false; + return connection.socket->type () == nano::transport::socket_type::realtime; }); } @@ -539,11 +532,7 @@ size_t nano::transport::tcp_listener::bootstrap_count () const nano::lock_guard lock{ mutex }; return std::count_if (connections.begin (), connections.end (), [] (auto const & connection) { - if (auto socket = connection.socket.lock ()) - { - return socket->is_bootstrap_connection (); - } - return false; + return connection.socket->type () == nano::transport::socket_type::bootstrap; }); } @@ -588,18 +577,14 @@ asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const auto nano::transport::tcp_listener::all_sockets () const -> std::deque> { nano::lock_guard lock{ mutex }; - auto r = connections - | std::views::transform ([] (auto const & connection) { return connection.socket.lock (); }) - | std::views::filter ([] (auto const & socket) { return socket != nullptr; }); + auto r = connections | std::views::transform ([] (auto const & connection) { return connection.socket; }); return { r.begin (), r.end () }; } auto nano::transport::tcp_listener::all_servers () const -> std::deque> { nano::lock_guard lock{ mutex }; - auto r = connections - | std::views::transform ([] (auto const & connection) { return connection.server.lock (); }) - | std::views::filter ([] (auto const & server) { return server != nullptr; }); + auto r = connections | std::views::transform ([] (auto const & connection) { return connection.server; }); return { r.begin (), r.end () }; } diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 4a835c45..85854a80 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -92,7 +92,7 @@ private: asio::awaitable wait_available_slots () const; void run_cleanup (); - void cleanup (); + void purge (nano::unique_lock &); void timeout (); asio::awaitable connect_impl (asio::ip::tcp::endpoint); @@ -106,7 +106,7 @@ private: }; accept_return accept_one (asio::ip::tcp::socket, connection_type); - accept_result check_limits (asio::ip::address const & ip, connection_type); + accept_result check_limits (asio::ip::address const & ip, connection_type) const; asio::awaitable accept_socket (); size_t count_per_type (connection_type) const; @@ -119,8 +119,8 @@ private: { connection_type type; asio::ip::tcp::endpoint endpoint; - std::weak_ptr socket; - std::weak_ptr server; + std::shared_ptr socket; + std::shared_ptr server; asio::ip::address address () const {