From 6768e7dc6e9a30172309d4d46d555a14c5d217ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 21 Mar 2024 21:34:57 +0100 Subject: [PATCH] Move `tcp_listener` to a dedicated file and reduce header coupling --- nano/core_test/network.cpp | 1 + nano/core_test/node.cpp | 1 + nano/core_test/socket.cpp | 1 + nano/node/CMakeLists.txt | 2 + nano/node/node.cpp | 1 + nano/node/node.hpp | 13 +- nano/node/transport/tcp_listener.cpp | 287 +++++++++++++++++++++++++++ nano/node/transport/tcp_listener.hpp | 60 ++++++ nano/node/transport/tcp_server.cpp | 283 +------------------------- nano/node/transport/tcp_server.hpp | 48 ----- nano/test_common/system.cpp | 1 + 11 files changed, 365 insertions(+), 333 deletions(-) create mode 100644 nano/node/transport/tcp_listener.cpp create mode 100644 nano/node/transport/tcp_listener.hpp diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 086491d0..d5a97da7 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 77029005..91924b5b 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 711d2041..bbbf9d88 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 3e3dd1dd..0b8ad15e 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -158,6 +158,8 @@ add_library( transport/socket.cpp transport/tcp.hpp transport/tcp.cpp + transport/tcp_listener.hpp + transport/tcp_listener.cpp transport/tcp_server.hpp transport/tcp_server.cpp transport/transport.hpp diff --git a/nano/node/node.cpp b/nano/node/node.cpp index da454247..589112e7 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 7bace7a7..c59a09ba 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -47,9 +47,6 @@ namespace nano { class active_transactions; -namespace rocksdb -{ -} // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice class node; class work_pool; @@ -57,7 +54,17 @@ namespace scheduler { class component; } +namespace transport +{ + class tcp_listener; +} +namespace rocksdb +{ +} // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice +} +namespace nano +{ // Configs backlog_population::config backlog_population_config (node_config const &); outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &); diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp new file mode 100644 index 00000000..07f3fc8d --- /dev/null +++ b/nano/node/transport/tcp_listener.cpp @@ -0,0 +1,287 @@ +#include +#include +#include +#include +#include + +#include + +namespace +{ +bool is_temporary_error (boost::system::error_code const & ec_a) +{ + switch (ec_a.value ()) + { +#if EAGAIN != EWOULDBLOCK + case EAGAIN: +#endif + + case EWOULDBLOCK: + case EINTR: + return true; + default: + return false; + } +} +} + +/* + * tcp_listener + */ + +nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) : + node (node_a), + strand{ node_a.io_ctx.get_executor () }, + acceptor{ node_a.io_ctx }, + local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } }, + max_inbound_connections{ max_inbound_connections } +{ +} + +nano::transport::tcp_listener::~tcp_listener () +{ + debug_assert (stopped); +} + +void nano::transport::tcp_listener::start (std::function const &, boost::system::error_code const &)> callback_a) +{ + nano::lock_guard lock{ mutex }; + + acceptor.open (local.protocol ()); + acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); + boost::system::error_code ec; + acceptor.bind (local, ec); + if (!ec) + { + acceptor.listen (boost::asio::socket_base::max_listen_connections, ec); + } + if (ec) + { + node.logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ec.message (), acceptor.local_endpoint ().port ()); + throw std::runtime_error (ec.message ()); + } + + on_connection (callback_a); +} + +void nano::transport::tcp_listener::stop () +{ + decltype (connections) connections_l; + { + nano::lock_guard lock{ mutex }; + stopped = true; + connections_l.swap (connections); + } + + nano::lock_guard lock{ mutex }; + boost::asio::dispatch (strand, [this_l = shared_from_this ()] () { + this_l->acceptor.close (); + + for (auto & address_connection_pair : this_l->connections_per_address) + { + if (auto connection_l = address_connection_pair.second.lock ()) + { + connection_l->close (); + } + } + this_l->connections_per_address.clear (); + }); +} + +std::size_t nano::transport::tcp_listener::connection_count () +{ + nano::lock_guard lock{ mutex }; + cleanup (); + return connections.size (); +} + +void nano::transport::tcp_listener::cleanup () +{ + debug_assert (!mutex.try_lock ()); + + erase_if (connections, [] (auto const & connection) { + return connection.second.expired (); + }); +} + +bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection) +{ + debug_assert (strand.running_in_this_thread ()); + if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ())) + { + // If the limit is disabled, then it is unreachable. + // If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64. + return false; + } + auto const counted_connections = socket_functions::count_subnetwork_connections ( + connections_per_address, + new_connection->remote.address ().to_v6 (), + node.network_params.network.ipv6_subnetwork_prefix_for_limiting); + return counted_connections >= node.network_params.network.max_peers_per_subnetwork; +} + +bool nano::transport::tcp_listener::limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection) +{ + debug_assert (strand.running_in_this_thread ()); + if (node.flags.disable_max_peers_per_ip) + { + // If the limit is disabled, then it is unreachable. + return false; + } + auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ()); + auto const counted_connections = static_cast (std::abs (std::distance (address_connections_range.first, address_connections_range.second))); + return counted_connections >= node.network_params.network.max_peers_per_ip; +} + +void nano::transport::tcp_listener::on_connection (std::function const &, boost::system::error_code const &)> callback_a) +{ + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { + if (!this_l->acceptor.is_open ()) + { + this_l->node.logger.error (nano::log::type::tcp_listener, "Acceptor is not open"); + return; + } + + // Prepare new connection + auto new_connection = std::make_shared (this_l->node, socket::endpoint_type_t::server); + this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, + boost::asio::bind_executor (this_l->strand, + [this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable { + this_l->evict_dead_connections (); + + if (this_l->connections_per_address.size () >= this_l->max_inbound_connections) + { + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections reached ({}), unable to open new connection", this_l->connections_per_address.size ()); + + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + if (this_l->limit_reached_for_incoming_ip_connections (new_connection)) + { + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in); + this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection", new_connection->remote_endpoint ().address ().to_string ()); + + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection)) + { + auto const remote_ip_address = new_connection->remote_endpoint ().address (); + debug_assert (remote_ip_address.is_v6 ()); + auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork); + + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in); + this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (subnetwork: {}, ip: {}), unable to open new connection", + remote_subnet.canonical ().to_string (), + remote_ip_address.to_string ()); + + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + if (!ec_a) + { + { + // Best effort attempt to get endpoint addresses + boost::system::error_code ec; + new_connection->local = new_connection->tcp_socket.local_endpoint (ec); + } + + // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start + // an IO operation immediately, which will start a timer. + new_connection->start (); + new_connection->set_timeout (this_l->node.network_params.network.idle_timeout); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); + this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection); + this_l->node.observers.socket_accepted.notify (*new_connection); + if (cbk (new_connection, ec_a)) + { + this_l->on_connection (std::move (cbk)); + return; + } + this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections"); + return; + } + + // accept error + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + this_l->node.logger.error (nano::log::type::tcp_listener, "Unable to accept connection: {} ({})", ec_a.message (), new_connection->remote_endpoint ().address ().to_string ()); + + if (is_temporary_error (ec_a)) + { + // if it is a temporary error, just retry it + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + // if it is not a temporary error, check how the listener wants to handle this error + if (cbk (new_connection, ec_a)) + { + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + // No requeue if we reach here, no incoming socket connections will be handled + this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections"); + })); + })); +} + +// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept. +// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and +// give the rest of the system a chance to recover. +void nano::transport::tcp_listener::on_connection_requeue_delayed (std::function const &, boost::system::error_code const &)> callback_a) +{ + node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { + this_l->on_connection (std::move (callback)); + }); +} + +// This must be called from a strand +void nano::transport::tcp_listener::evict_dead_connections () +{ + debug_assert (strand.running_in_this_thread ()); + + erase_if (connections_per_address, [] (auto const & entry) { + return entry.second.expired (); + }); +} + +void nano::transport::tcp_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr const & socket_a) +{ + if (!node.network.excluded_peers.check (socket_a->remote_endpoint ())) + { + auto server = std::make_shared (socket_a, node.shared (), true); + nano::lock_guard lock{ mutex }; + connections[server.get ()] = server; + server->start (); + } + else + { + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_excluded); + node.logger.debug (nano::log::type::tcp_server, "Rejected connection from excluded peer: {}", nano::util::to_str (socket_a->remote_endpoint ())); + } +} + +boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const +{ + nano::lock_guard lock{ mutex }; + if (!stopped) + { + return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ()); + } + else + { + return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), 0); + } +} + +std::unique_ptr nano::transport::tcp_listener::collect_container_info (std::string const & name) +{ + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) })); + return composite; +} \ No newline at end of file diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp new file mode 100644 index 00000000..ae049d97 --- /dev/null +++ b/nano/node/transport/tcp_listener.hpp @@ -0,0 +1,60 @@ +#pragma once + +#include +#include + +#include + +namespace nano::transport +{ +class socket; +class tcp_server; + +/** + * Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected. + */ +class tcp_listener final : public std::enable_shared_from_this +{ +public: + tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections); + ~tcp_listener (); + + void start (std::function const &, boost::system::error_code const &)> callback); + void stop (); + + void accept_action (boost::system::error_code const &, std::shared_ptr const &); + + std::size_t connection_count (); + nano::tcp_endpoint endpoint () const; + + std::unique_ptr collect_container_info (std::string const & name); + +private: // Dependencies + nano::node & node; + +private: + void on_connection (std::function const &, boost::system::error_code const &)> callback_a); + void evict_dead_connections (); + void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); + /** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */ + bool limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection); + bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection); + void cleanup (); + +public: + std::atomic bootstrap_count{ 0 }; + std::atomic realtime_count{ 0 }; + +private: + std::unordered_map> connections; + std::multimap> connections_per_address; + + boost::asio::strand strand; + boost::asio::ip::tcp::acceptor acceptor; + boost::asio::ip::tcp::endpoint local; + std::size_t const max_inbound_connections; + + std::atomic stopped; + mutable nano::mutex mutex; +}; +} \ No newline at end of file diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index b6930d52..e4dc93a4 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -4,292 +4,11 @@ #include #include #include +#include #include -#include - #include -namespace -{ -bool is_temporary_error (boost::system::error_code const & ec_a) -{ - switch (ec_a.value ()) - { -#if EAGAIN != EWOULDBLOCK - case EAGAIN: -#endif - - case EWOULDBLOCK: - case EINTR: - return true; - default: - return false; - } -} -} - -/* - * tcp_listener - */ - -nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) : - node (node_a), - strand{ node_a.io_ctx.get_executor () }, - acceptor{ node_a.io_ctx }, - local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } }, - max_inbound_connections{ max_inbound_connections } -{ -} - -nano::transport::tcp_listener::~tcp_listener () -{ - debug_assert (stopped); -} - -void nano::transport::tcp_listener::start (std::function const &, boost::system::error_code const &)> callback_a) -{ - nano::lock_guard lock{ mutex }; - - acceptor.open (local.protocol ()); - acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); - boost::system::error_code ec; - acceptor.bind (local, ec); - if (!ec) - { - acceptor.listen (boost::asio::socket_base::max_listen_connections, ec); - } - if (ec) - { - node.logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ec.message (), acceptor.local_endpoint ().port ()); - throw std::runtime_error (ec.message ()); - } - - on_connection (callback_a); -} - -void nano::transport::tcp_listener::stop () -{ - decltype (connections) connections_l; - { - nano::lock_guard lock{ mutex }; - stopped = true; - connections_l.swap (connections); - } - - nano::lock_guard lock{ mutex }; - boost::asio::dispatch (strand, [this_l = shared_from_this ()] () { - this_l->acceptor.close (); - - for (auto & address_connection_pair : this_l->connections_per_address) - { - if (auto connection_l = address_connection_pair.second.lock ()) - { - connection_l->close (); - } - } - this_l->connections_per_address.clear (); - }); -} - -std::size_t nano::transport::tcp_listener::connection_count () -{ - nano::lock_guard lock{ mutex }; - cleanup (); - return connections.size (); -} - -void nano::transport::tcp_listener::cleanup () -{ - debug_assert (!mutex.try_lock ()); - - erase_if (connections, [] (auto const & connection) { - return connection.second.expired (); - }); -} - -bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection) -{ - debug_assert (strand.running_in_this_thread ()); - if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ())) - { - // If the limit is disabled, then it is unreachable. - // If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64. - return false; - } - auto const counted_connections = socket_functions::count_subnetwork_connections ( - connections_per_address, - new_connection->remote.address ().to_v6 (), - node.network_params.network.ipv6_subnetwork_prefix_for_limiting); - return counted_connections >= node.network_params.network.max_peers_per_subnetwork; -} - -bool nano::transport::tcp_listener::limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection) -{ - debug_assert (strand.running_in_this_thread ()); - if (node.flags.disable_max_peers_per_ip) - { - // If the limit is disabled, then it is unreachable. - return false; - } - auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ()); - auto const counted_connections = static_cast (std::abs (std::distance (address_connections_range.first, address_connections_range.second))); - return counted_connections >= node.network_params.network.max_peers_per_ip; -} - -void nano::transport::tcp_listener::on_connection (std::function const &, boost::system::error_code const &)> callback_a) -{ - boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { - if (!this_l->acceptor.is_open ()) - { - this_l->node.logger.error (nano::log::type::tcp_listener, "Acceptor is not open"); - return; - } - - // Prepare new connection - auto new_connection = std::make_shared (this_l->node, socket::endpoint_type_t::server); - this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, - boost::asio::bind_executor (this_l->strand, - [this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable { - this_l->evict_dead_connections (); - - if (this_l->connections_per_address.size () >= this_l->max_inbound_connections) - { - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); - this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections reached ({}), unable to open new connection", this_l->connections_per_address.size ()); - - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - if (this_l->limit_reached_for_incoming_ip_connections (new_connection)) - { - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in); - this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection", new_connection->remote_endpoint ().address ().to_string ()); - - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection)) - { - auto const remote_ip_address = new_connection->remote_endpoint ().address (); - debug_assert (remote_ip_address.is_v6 ()); - auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork); - - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in); - this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (subnetwork: {}, ip: {}), unable to open new connection", - remote_subnet.canonical ().to_string (), - remote_ip_address.to_string ()); - - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - if (!ec_a) - { - { - // Best effort attempt to get endpoint addresses - boost::system::error_code ec; - new_connection->local = new_connection->tcp_socket.local_endpoint (ec); - } - - // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start - // an IO operation immediately, which will start a timer. - new_connection->start (); - new_connection->set_timeout (this_l->node.network_params.network.idle_timeout); - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); - this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection); - this_l->node.observers.socket_accepted.notify (*new_connection); - if (cbk (new_connection, ec_a)) - { - this_l->on_connection (std::move (cbk)); - return; - } - this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections"); - return; - } - - // accept error - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); - this_l->node.logger.error (nano::log::type::tcp_listener, "Unable to accept connection: {} ({})", ec_a.message (), new_connection->remote_endpoint ().address ().to_string ()); - - if (is_temporary_error (ec_a)) - { - // if it is a temporary error, just retry it - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - // if it is not a temporary error, check how the listener wants to handle this error - if (cbk (new_connection, ec_a)) - { - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - // No requeue if we reach here, no incoming socket connections will be handled - this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections"); - })); - })); -} - -// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept. -// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and -// give the rest of the system a chance to recover. -void nano::transport::tcp_listener::on_connection_requeue_delayed (std::function const &, boost::system::error_code const &)> callback_a) -{ - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { - this_l->on_connection (std::move (callback)); - }); -} - -// This must be called from a strand -void nano::transport::tcp_listener::evict_dead_connections () -{ - debug_assert (strand.running_in_this_thread ()); - - erase_if (connections_per_address, [] (auto const & entry) { - return entry.second.expired (); - }); -} - -void nano::transport::tcp_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr const & socket_a) -{ - if (!node.network.excluded_peers.check (socket_a->remote_endpoint ())) - { - auto server = std::make_shared (socket_a, node.shared (), true); - nano::lock_guard lock{ mutex }; - connections[server.get ()] = server; - server->start (); - } - else - { - node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_excluded); - node.logger.debug (nano::log::type::tcp_server, "Rejected connection from excluded peer: {}", nano::util::to_str (socket_a->remote_endpoint ())); - } -} - -boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const -{ - nano::lock_guard lock{ mutex }; - if (!stopped) - { - return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ()); - } - else - { - return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), 0); - } -} - -std::unique_ptr nano::transport::tcp_listener::collect_container_info (std::string const & name) -{ - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) })); - return composite; -} - /* * tcp_server */ diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 8743772c..fdd28f18 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -16,54 +16,6 @@ namespace nano::transport class message_deserializer; class tcp_server; -/** - * Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected. - */ -class tcp_listener final : public std::enable_shared_from_this -{ -public: - tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections); - ~tcp_listener (); - - void start (std::function const &, boost::system::error_code const &)> callback); - void stop (); - - void accept_action (boost::system::error_code const &, std::shared_ptr const &); - - std::size_t connection_count (); - nano::tcp_endpoint endpoint () const; - - std::unique_ptr collect_container_info (std::string const & name); - -private: // Dependencies - nano::node & node; - -private: - void on_connection (std::function const &, boost::system::error_code const &)> callback_a); - void evict_dead_connections (); - void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); - /** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */ - bool limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection); - bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection); - void cleanup (); - -public: - std::atomic bootstrap_count{ 0 }; - std::atomic realtime_count{ 0 }; - -private: - std::unordered_map> connections; - std::multimap> connections_per_address; - - boost::asio::strand strand; - boost::asio::ip::tcp::acceptor acceptor; - boost::asio::ip::tcp::endpoint local; - std::size_t const max_inbound_connections; - - std::atomic stopped; - mutable nano::mutex mutex; -}; - class tcp_server final : public std::enable_shared_from_this { public: diff --git a/nano/test_common/system.cpp b/nano/test_common/system.cpp index 914d472e..4c9b2579 100644 --- a/nano/test_common/system.cpp +++ b/nano/test_common/system.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include