From f7432195b92aa77e4ed54a800b17ccd8081ad3c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 21 Mar 2024 21:15:44 +0100 Subject: [PATCH 1/4] Start/stop guards --- nano/test_common/testutil.hpp | 64 ++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/nano/test_common/testutil.hpp b/nano/test_common/testutil.hpp index 60821e7d..a2706dc3 100644 --- a/nano/test_common/testutil.hpp +++ b/nano/test_common/testutil.hpp @@ -115,6 +115,46 @@ ASSERT_FALSE (condition); \ } +namespace nano::test +{ +template +class start_stop_guard +{ +public: + explicit start_stop_guard (Ts &... refs_a) : + refs{ std::forward (refs_a)... } + { + std::apply ([] (Ts &... refs) { (refs.start (), ...); }, refs); + } + + ~start_stop_guard () + { + std::apply ([] (Ts &... refs) { (refs.stop (), ...); }, refs); + } + +private: + std::tuple refs; +}; + +template +class stop_guard +{ +public: + explicit stop_guard (Ts &... refs_a) : + refs{ std::forward (refs_a)... } + { + } + + ~stop_guard () + { + std::apply ([] (Ts &... refs) { (refs.stop (), ...); }, refs); + } + +private: + std::tuple refs; +}; +} + /* Convenience globals for gtest projects */ namespace nano { @@ -233,28 +273,6 @@ namespace test std::atomic required_count; }; - /** - * A helper that calls `start` from constructor and `stop` from destructor - */ - template - class start_stop_guard - { - public: - explicit start_stop_guard (T & ref_a) : - ref{ ref_a } - { - ref.start (); - } - - ~start_stop_guard () - { - ref.stop (); - } - - private: - T & ref; - }; - void wait_peer_connections (nano::test::system &); /** @@ -408,4 +426,4 @@ namespace test */ void print_all_blocks (nano::node & node); } -} +} \ No newline at end of file From 970b048052c2cc0470bace686e881aa6b9ccf514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 19 Mar 2024 11:00:11 +0100 Subject: [PATCH 2/4] Reorganize `tcp_listener` --- nano/core_test/network.cpp | 15 ++------- nano/node/node.cpp | 2 +- nano/node/transport/tcp_server.cpp | 41 ++++++++++++++---------- nano/node/transport/tcp_server.hpp | 50 ++++++++++++++++++------------ 4 files changed, 60 insertions(+), 48 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 82c014c3..086491d0 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -698,10 +698,7 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty) system.deadline_set (std::chrono::seconds (6)); while (!disconnected) { - { - nano::lock_guard guard (node0->tcp_listener->mutex); - disconnected = node0->tcp_listener->connections.empty (); - } + disconnected = node0->tcp_listener->connection_count () == 0; ASSERT_NO_ERROR (system.poll ()); } } @@ -723,18 +720,12 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) }); }); ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0); - { - nano::lock_guard guard (node0->tcp_listener->mutex); - ASSERT_EQ (node0->tcp_listener->connections.size (), 1); - } + ASSERT_EQ (node0->tcp_listener->connection_count (), 1); bool disconnected (false); system.deadline_set (std::chrono::seconds (20)); while (!disconnected) { - { - nano::lock_guard guard (node0->tcp_listener->mutex); - disconnected = node0->tcp_listener->connections.empty (); - } + disconnected = node0->tcp_listener->connection_count () == 0; ASSERT_NO_ERROR (system.poll ()); } } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 95916058..da454247 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -535,7 +535,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.ledger, "ledger")); composite->add_component (collect_container_info (node.active, "active")); composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator")); - composite->add_component (collect_container_info (*node.tcp_listener, "tcp_listener")); + composite->add_component (node.tcp_listener->collect_container_info ("tcp_listener")); composite->add_component (collect_container_info (node.network, "network")); composite->add_component (node.telemetry.collect_container_info ("telemetry")); composite->add_component (collect_container_info (node.workers, "workers")); diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index a10d85dd..b6930d52 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -42,10 +42,15 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_ { } +nano::transport::tcp_listener::~tcp_listener () +{ + debug_assert (stopped); +} + void nano::transport::tcp_listener::start (std::function const &, boost::system::error_code const &)> callback_a) { nano::lock_guard lock{ mutex }; - on = true; + acceptor.open (local.protocol ()); acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); boost::system::error_code ec; @@ -68,12 +73,14 @@ void nano::transport::tcp_listener::stop () decltype (connections) connections_l; { nano::lock_guard lock{ mutex }; - on = false; + stopped = true; connections_l.swap (connections); } + nano::lock_guard lock{ mutex }; - boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this ()] () { + boost::asio::dispatch (strand, [this_l = shared_from_this ()] () { this_l->acceptor.close (); + for (auto & address_connection_pair : this_l->connections_per_address) { if (auto connection_l = address_connection_pair.second.lock ()) @@ -82,15 +89,25 @@ void nano::transport::tcp_listener::stop () } } this_l->connections_per_address.clear (); - })); + }); } std::size_t nano::transport::tcp_listener::connection_count () { nano::lock_guard lock{ mutex }; + cleanup (); return connections.size (); } +void nano::transport::tcp_listener::cleanup () +{ + debug_assert (!mutex.try_lock ()); + + erase_if (connections, [] (auto const & connection) { + return connection.second.expired (); + }); +} + bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection) { debug_assert (strand.running_in_this_thread ()); @@ -253,10 +270,10 @@ void nano::transport::tcp_listener::accept_action (boost::system::error_code con } } -boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () +boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const { nano::lock_guard lock{ mutex }; - if (on) + if (!stopped) { return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ()); } @@ -266,11 +283,10 @@ boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () } } -std::unique_ptr nano::transport::collect_container_info (nano::transport::tcp_listener & bootstrap_listener, std::string const & name) +std::unique_ptr nano::transport::tcp_listener::collect_container_info (std::string const & name) { - auto sizeof_element = sizeof (decltype (bootstrap_listener.connections)::value_type); auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "connections", bootstrap_listener.connection_count (), sizeof_element })); + composite->add_component (std::make_unique (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) })); return composite; } @@ -321,9 +337,6 @@ nano::transport::tcp_server::~tcp_server () } stop (); - - nano::lock_guard lock{ node->tcp_listener->mutex }; - node->tcp_listener->connections.erase (this); } void nano::transport::tcp_server::start () @@ -840,10 +853,6 @@ void nano::transport::tcp_server::timeout () { node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint)); - { - nano::lock_guard lock{ node->tcp_listener->mutex }; - node->tcp_listener->connections.erase (this); - } socket->close (); } } diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index e8a220bc..8743772c 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -19,38 +19,50 @@ class tcp_server; /** * Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected. */ -class tcp_listener final : public std::enable_shared_from_this +class tcp_listener final : public std::enable_shared_from_this { public: - tcp_listener (uint16_t, nano::node &, std::size_t); - void start (std::function const &, boost::system::error_code const &)> callback_a); - void stop (); - void accept_action (boost::system::error_code const &, std::shared_ptr const &); - std::size_t connection_count (); + tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections); + ~tcp_listener (); - nano::mutex mutex; - std::unordered_map> connections; - nano::tcp_endpoint endpoint (); + void start (std::function const &, boost::system::error_code const &)> callback); + void stop (); + + void accept_action (boost::system::error_code const &, std::shared_ptr const &); + + std::size_t connection_count (); + nano::tcp_endpoint endpoint () const; + + std::unique_ptr collect_container_info (std::string const & name); + +private: // Dependencies nano::node & node; - bool on{ false }; - std::atomic bootstrap_count{ 0 }; - std::atomic realtime_count{ 0 }; private: - boost::asio::strand strand; - nano::transport::address_socket_mmap connections_per_address; - boost::asio::ip::tcp::acceptor acceptor; - boost::asio::ip::tcp::endpoint local; - std::size_t max_inbound_connections; void on_connection (std::function const &, boost::system::error_code const &)> callback_a); void evict_dead_connections (); void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); /** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */ bool limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection); bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection); -}; + void cleanup (); -std::unique_ptr collect_container_info (tcp_listener & bootstrap_listener, std::string const & name); +public: + std::atomic bootstrap_count{ 0 }; + std::atomic realtime_count{ 0 }; + +private: + std::unordered_map> connections; + std::multimap> connections_per_address; + + boost::asio::strand strand; + boost::asio::ip::tcp::acceptor acceptor; + boost::asio::ip::tcp::endpoint local; + std::size_t const max_inbound_connections; + + std::atomic stopped; + mutable nano::mutex mutex; +}; class tcp_server final : public std::enable_shared_from_this { From a47da679a9de942ae77cfdac61c12a1c83cf313d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 21 Mar 2024 21:20:10 +0100 Subject: [PATCH 3/4] Use stop guards --- nano/core_test/socket.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 11a0862b..711d2041 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -29,6 +29,7 @@ TEST (socket, max_connections) // start a server socket that allows max 2 live connections auto listener = std::make_shared (server_port, *node, 2); + nano::test::stop_guard stop_guard{ *listener }; listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_sockets.push_back (new_connection); return true; @@ -123,6 +124,7 @@ TEST (socket, max_connections_per_ip) std::vector> server_sockets; auto listener = std::make_shared (server_port, *node, max_global_connections); + nano::test::stop_guard stop_guard{ *listener }; listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_sockets.push_back (new_connection); return true; @@ -243,6 +245,7 @@ TEST (socket, max_connections_per_subnetwork) std::vector> server_sockets; auto listener = std::make_shared (server_port, *node, max_global_connections); + nano::test::stop_guard stop_guard{ *listener }; listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_sockets.push_back (new_connection); return true; @@ -303,6 +306,7 @@ TEST (socket, disabled_max_peers_per_ip) std::vector> server_sockets; auto server_socket = std::make_shared (server_port, *node, max_global_connections); + nano::test::stop_guard stop_guard{ *server_socket }; server_socket->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_sockets.push_back (new_connection); return true; @@ -363,6 +367,7 @@ TEST (socket, disconnection_of_silent_connections) // start a server listening socket auto listener = std::make_shared (server_port, *node, 1); + nano::test::stop_guard stop_guard{ *listener }; listener->start ([&server_data_socket] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { server_data_socket = new_connection; return true; @@ -414,6 +419,7 @@ TEST (socket, drop_policy) auto server_port (system.get_available_port ()); auto listener = std::make_shared (server_port, *node, 1); + nano::test::stop_guard stop_guard{ *listener }; listener->start ([&connections] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { connections.push_back (new_connection); return true; @@ -502,6 +508,7 @@ TEST (socket, concurrent_writes) std::vector> connections; auto listener = std::make_shared (server_port, *node, max_connections); + nano::test::stop_guard stop_guard{ *listener }; listener->start ([&connections, &reader] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { if (ec_a) { From 6768e7dc6e9a30172309d4d46d555a14c5d217ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 21 Mar 2024 21:34:57 +0100 Subject: [PATCH 4/4] Move `tcp_listener` to a dedicated file and reduce header coupling --- nano/core_test/network.cpp | 1 + nano/core_test/node.cpp | 1 + nano/core_test/socket.cpp | 1 + nano/node/CMakeLists.txt | 2 + nano/node/node.cpp | 1 + nano/node/node.hpp | 13 +- nano/node/transport/tcp_listener.cpp | 287 +++++++++++++++++++++++++++ nano/node/transport/tcp_listener.hpp | 60 ++++++ nano/node/transport/tcp_server.cpp | 283 +------------------------- nano/node/transport/tcp_server.hpp | 48 ----- nano/test_common/system.cpp | 1 + 11 files changed, 365 insertions(+), 333 deletions(-) create mode 100644 nano/node/transport/tcp_listener.cpp create mode 100644 nano/node/transport/tcp_listener.hpp diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 086491d0..d5a97da7 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 77029005..91924b5b 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 711d2041..bbbf9d88 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 3e3dd1dd..0b8ad15e 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -158,6 +158,8 @@ add_library( transport/socket.cpp transport/tcp.hpp transport/tcp.cpp + transport/tcp_listener.hpp + transport/tcp_listener.cpp transport/tcp_server.hpp transport/tcp_server.cpp transport/transport.hpp diff --git a/nano/node/node.cpp b/nano/node/node.cpp index da454247..589112e7 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 7bace7a7..c59a09ba 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -47,9 +47,6 @@ namespace nano { class active_transactions; -namespace rocksdb -{ -} // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice class node; class work_pool; @@ -57,7 +54,17 @@ namespace scheduler { class component; } +namespace transport +{ + class tcp_listener; +} +namespace rocksdb +{ +} // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice +} +namespace nano +{ // Configs backlog_population::config backlog_population_config (node_config const &); outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &); diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp new file mode 100644 index 00000000..07f3fc8d --- /dev/null +++ b/nano/node/transport/tcp_listener.cpp @@ -0,0 +1,287 @@ +#include +#include +#include +#include +#include + +#include + +namespace +{ +bool is_temporary_error (boost::system::error_code const & ec_a) +{ + switch (ec_a.value ()) + { +#if EAGAIN != EWOULDBLOCK + case EAGAIN: +#endif + + case EWOULDBLOCK: + case EINTR: + return true; + default: + return false; + } +} +} + +/* + * tcp_listener + */ + +nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) : + node (node_a), + strand{ node_a.io_ctx.get_executor () }, + acceptor{ node_a.io_ctx }, + local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } }, + max_inbound_connections{ max_inbound_connections } +{ +} + +nano::transport::tcp_listener::~tcp_listener () +{ + debug_assert (stopped); +} + +void nano::transport::tcp_listener::start (std::function const &, boost::system::error_code const &)> callback_a) +{ + nano::lock_guard lock{ mutex }; + + acceptor.open (local.protocol ()); + acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); + boost::system::error_code ec; + acceptor.bind (local, ec); + if (!ec) + { + acceptor.listen (boost::asio::socket_base::max_listen_connections, ec); + } + if (ec) + { + node.logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ec.message (), acceptor.local_endpoint ().port ()); + throw std::runtime_error (ec.message ()); + } + + on_connection (callback_a); +} + +void nano::transport::tcp_listener::stop () +{ + decltype (connections) connections_l; + { + nano::lock_guard lock{ mutex }; + stopped = true; + connections_l.swap (connections); + } + + nano::lock_guard lock{ mutex }; + boost::asio::dispatch (strand, [this_l = shared_from_this ()] () { + this_l->acceptor.close (); + + for (auto & address_connection_pair : this_l->connections_per_address) + { + if (auto connection_l = address_connection_pair.second.lock ()) + { + connection_l->close (); + } + } + this_l->connections_per_address.clear (); + }); +} + +std::size_t nano::transport::tcp_listener::connection_count () +{ + nano::lock_guard lock{ mutex }; + cleanup (); + return connections.size (); +} + +void nano::transport::tcp_listener::cleanup () +{ + debug_assert (!mutex.try_lock ()); + + erase_if (connections, [] (auto const & connection) { + return connection.second.expired (); + }); +} + +bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection) +{ + debug_assert (strand.running_in_this_thread ()); + if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ())) + { + // If the limit is disabled, then it is unreachable. + // If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64. + return false; + } + auto const counted_connections = socket_functions::count_subnetwork_connections ( + connections_per_address, + new_connection->remote.address ().to_v6 (), + node.network_params.network.ipv6_subnetwork_prefix_for_limiting); + return counted_connections >= node.network_params.network.max_peers_per_subnetwork; +} + +bool nano::transport::tcp_listener::limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection) +{ + debug_assert (strand.running_in_this_thread ()); + if (node.flags.disable_max_peers_per_ip) + { + // If the limit is disabled, then it is unreachable. + return false; + } + auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ()); + auto const counted_connections = static_cast (std::abs (std::distance (address_connections_range.first, address_connections_range.second))); + return counted_connections >= node.network_params.network.max_peers_per_ip; +} + +void nano::transport::tcp_listener::on_connection (std::function const &, boost::system::error_code const &)> callback_a) +{ + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { + if (!this_l->acceptor.is_open ()) + { + this_l->node.logger.error (nano::log::type::tcp_listener, "Acceptor is not open"); + return; + } + + // Prepare new connection + auto new_connection = std::make_shared (this_l->node, socket::endpoint_type_t::server); + this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, + boost::asio::bind_executor (this_l->strand, + [this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable { + this_l->evict_dead_connections (); + + if (this_l->connections_per_address.size () >= this_l->max_inbound_connections) + { + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections reached ({}), unable to open new connection", this_l->connections_per_address.size ()); + + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + if (this_l->limit_reached_for_incoming_ip_connections (new_connection)) + { + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in); + this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection", new_connection->remote_endpoint ().address ().to_string ()); + + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection)) + { + auto const remote_ip_address = new_connection->remote_endpoint ().address (); + debug_assert (remote_ip_address.is_v6 ()); + auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork); + + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in); + this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (subnetwork: {}, ip: {}), unable to open new connection", + remote_subnet.canonical ().to_string (), + remote_ip_address.to_string ()); + + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + if (!ec_a) + { + { + // Best effort attempt to get endpoint addresses + boost::system::error_code ec; + new_connection->local = new_connection->tcp_socket.local_endpoint (ec); + } + + // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start + // an IO operation immediately, which will start a timer. + new_connection->start (); + new_connection->set_timeout (this_l->node.network_params.network.idle_timeout); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); + this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection); + this_l->node.observers.socket_accepted.notify (*new_connection); + if (cbk (new_connection, ec_a)) + { + this_l->on_connection (std::move (cbk)); + return; + } + this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections"); + return; + } + + // accept error + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + this_l->node.logger.error (nano::log::type::tcp_listener, "Unable to accept connection: {} ({})", ec_a.message (), new_connection->remote_endpoint ().address ().to_string ()); + + if (is_temporary_error (ec_a)) + { + // if it is a temporary error, just retry it + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + // if it is not a temporary error, check how the listener wants to handle this error + if (cbk (new_connection, ec_a)) + { + this_l->on_connection_requeue_delayed (std::move (cbk)); + return; + } + + // No requeue if we reach here, no incoming socket connections will be handled + this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections"); + })); + })); +} + +// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept. +// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and +// give the rest of the system a chance to recover. +void nano::transport::tcp_listener::on_connection_requeue_delayed (std::function const &, boost::system::error_code const &)> callback_a) +{ + node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { + this_l->on_connection (std::move (callback)); + }); +} + +// This must be called from a strand +void nano::transport::tcp_listener::evict_dead_connections () +{ + debug_assert (strand.running_in_this_thread ()); + + erase_if (connections_per_address, [] (auto const & entry) { + return entry.second.expired (); + }); +} + +void nano::transport::tcp_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr const & socket_a) +{ + if (!node.network.excluded_peers.check (socket_a->remote_endpoint ())) + { + auto server = std::make_shared (socket_a, node.shared (), true); + nano::lock_guard lock{ mutex }; + connections[server.get ()] = server; + server->start (); + } + else + { + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_excluded); + node.logger.debug (nano::log::type::tcp_server, "Rejected connection from excluded peer: {}", nano::util::to_str (socket_a->remote_endpoint ())); + } +} + +boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const +{ + nano::lock_guard lock{ mutex }; + if (!stopped) + { + return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ()); + } + else + { + return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), 0); + } +} + +std::unique_ptr nano::transport::tcp_listener::collect_container_info (std::string const & name) +{ + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) })); + return composite; +} \ No newline at end of file diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp new file mode 100644 index 00000000..ae049d97 --- /dev/null +++ b/nano/node/transport/tcp_listener.hpp @@ -0,0 +1,60 @@ +#pragma once + +#include +#include + +#include + +namespace nano::transport +{ +class socket; +class tcp_server; + +/** + * Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected. + */ +class tcp_listener final : public std::enable_shared_from_this +{ +public: + tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections); + ~tcp_listener (); + + void start (std::function const &, boost::system::error_code const &)> callback); + void stop (); + + void accept_action (boost::system::error_code const &, std::shared_ptr const &); + + std::size_t connection_count (); + nano::tcp_endpoint endpoint () const; + + std::unique_ptr collect_container_info (std::string const & name); + +private: // Dependencies + nano::node & node; + +private: + void on_connection (std::function const &, boost::system::error_code const &)> callback_a); + void evict_dead_connections (); + void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); + /** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */ + bool limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection); + bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection); + void cleanup (); + +public: + std::atomic bootstrap_count{ 0 }; + std::atomic realtime_count{ 0 }; + +private: + std::unordered_map> connections; + std::multimap> connections_per_address; + + boost::asio::strand strand; + boost::asio::ip::tcp::acceptor acceptor; + boost::asio::ip::tcp::endpoint local; + std::size_t const max_inbound_connections; + + std::atomic stopped; + mutable nano::mutex mutex; +}; +} \ No newline at end of file diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index b6930d52..e4dc93a4 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -4,292 +4,11 @@ #include #include #include +#include #include -#include - #include -namespace -{ -bool is_temporary_error (boost::system::error_code const & ec_a) -{ - switch (ec_a.value ()) - { -#if EAGAIN != EWOULDBLOCK - case EAGAIN: -#endif - - case EWOULDBLOCK: - case EINTR: - return true; - default: - return false; - } -} -} - -/* - * tcp_listener - */ - -nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) : - node (node_a), - strand{ node_a.io_ctx.get_executor () }, - acceptor{ node_a.io_ctx }, - local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } }, - max_inbound_connections{ max_inbound_connections } -{ -} - -nano::transport::tcp_listener::~tcp_listener () -{ - debug_assert (stopped); -} - -void nano::transport::tcp_listener::start (std::function const &, boost::system::error_code const &)> callback_a) -{ - nano::lock_guard lock{ mutex }; - - acceptor.open (local.protocol ()); - acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); - boost::system::error_code ec; - acceptor.bind (local, ec); - if (!ec) - { - acceptor.listen (boost::asio::socket_base::max_listen_connections, ec); - } - if (ec) - { - node.logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ec.message (), acceptor.local_endpoint ().port ()); - throw std::runtime_error (ec.message ()); - } - - on_connection (callback_a); -} - -void nano::transport::tcp_listener::stop () -{ - decltype (connections) connections_l; - { - nano::lock_guard lock{ mutex }; - stopped = true; - connections_l.swap (connections); - } - - nano::lock_guard lock{ mutex }; - boost::asio::dispatch (strand, [this_l = shared_from_this ()] () { - this_l->acceptor.close (); - - for (auto & address_connection_pair : this_l->connections_per_address) - { - if (auto connection_l = address_connection_pair.second.lock ()) - { - connection_l->close (); - } - } - this_l->connections_per_address.clear (); - }); -} - -std::size_t nano::transport::tcp_listener::connection_count () -{ - nano::lock_guard lock{ mutex }; - cleanup (); - return connections.size (); -} - -void nano::transport::tcp_listener::cleanup () -{ - debug_assert (!mutex.try_lock ()); - - erase_if (connections, [] (auto const & connection) { - return connection.second.expired (); - }); -} - -bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection) -{ - debug_assert (strand.running_in_this_thread ()); - if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ())) - { - // If the limit is disabled, then it is unreachable. - // If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64. - return false; - } - auto const counted_connections = socket_functions::count_subnetwork_connections ( - connections_per_address, - new_connection->remote.address ().to_v6 (), - node.network_params.network.ipv6_subnetwork_prefix_for_limiting); - return counted_connections >= node.network_params.network.max_peers_per_subnetwork; -} - -bool nano::transport::tcp_listener::limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection) -{ - debug_assert (strand.running_in_this_thread ()); - if (node.flags.disable_max_peers_per_ip) - { - // If the limit is disabled, then it is unreachable. - return false; - } - auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ()); - auto const counted_connections = static_cast (std::abs (std::distance (address_connections_range.first, address_connections_range.second))); - return counted_connections >= node.network_params.network.max_peers_per_ip; -} - -void nano::transport::tcp_listener::on_connection (std::function const &, boost::system::error_code const &)> callback_a) -{ - boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { - if (!this_l->acceptor.is_open ()) - { - this_l->node.logger.error (nano::log::type::tcp_listener, "Acceptor is not open"); - return; - } - - // Prepare new connection - auto new_connection = std::make_shared (this_l->node, socket::endpoint_type_t::server); - this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, - boost::asio::bind_executor (this_l->strand, - [this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable { - this_l->evict_dead_connections (); - - if (this_l->connections_per_address.size () >= this_l->max_inbound_connections) - { - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); - this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections reached ({}), unable to open new connection", this_l->connections_per_address.size ()); - - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - if (this_l->limit_reached_for_incoming_ip_connections (new_connection)) - { - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in); - this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection", new_connection->remote_endpoint ().address ().to_string ()); - - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection)) - { - auto const remote_ip_address = new_connection->remote_endpoint ().address (); - debug_assert (remote_ip_address.is_v6 ()); - auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork); - - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in); - this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (subnetwork: {}, ip: {}), unable to open new connection", - remote_subnet.canonical ().to_string (), - remote_ip_address.to_string ()); - - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - if (!ec_a) - { - { - // Best effort attempt to get endpoint addresses - boost::system::error_code ec; - new_connection->local = new_connection->tcp_socket.local_endpoint (ec); - } - - // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start - // an IO operation immediately, which will start a timer. - new_connection->start (); - new_connection->set_timeout (this_l->node.network_params.network.idle_timeout); - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); - this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection); - this_l->node.observers.socket_accepted.notify (*new_connection); - if (cbk (new_connection, ec_a)) - { - this_l->on_connection (std::move (cbk)); - return; - } - this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections"); - return; - } - - // accept error - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); - this_l->node.logger.error (nano::log::type::tcp_listener, "Unable to accept connection: {} ({})", ec_a.message (), new_connection->remote_endpoint ().address ().to_string ()); - - if (is_temporary_error (ec_a)) - { - // if it is a temporary error, just retry it - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - // if it is not a temporary error, check how the listener wants to handle this error - if (cbk (new_connection, ec_a)) - { - this_l->on_connection_requeue_delayed (std::move (cbk)); - return; - } - - // No requeue if we reach here, no incoming socket connections will be handled - this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections"); - })); - })); -} - -// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept. -// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and -// give the rest of the system a chance to recover. -void nano::transport::tcp_listener::on_connection_requeue_delayed (std::function const &, boost::system::error_code const &)> callback_a) -{ - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable { - this_l->on_connection (std::move (callback)); - }); -} - -// This must be called from a strand -void nano::transport::tcp_listener::evict_dead_connections () -{ - debug_assert (strand.running_in_this_thread ()); - - erase_if (connections_per_address, [] (auto const & entry) { - return entry.second.expired (); - }); -} - -void nano::transport::tcp_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr const & socket_a) -{ - if (!node.network.excluded_peers.check (socket_a->remote_endpoint ())) - { - auto server = std::make_shared (socket_a, node.shared (), true); - nano::lock_guard lock{ mutex }; - connections[server.get ()] = server; - server->start (); - } - else - { - node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_excluded); - node.logger.debug (nano::log::type::tcp_server, "Rejected connection from excluded peer: {}", nano::util::to_str (socket_a->remote_endpoint ())); - } -} - -boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const -{ - nano::lock_guard lock{ mutex }; - if (!stopped) - { - return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ()); - } - else - { - return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), 0); - } -} - -std::unique_ptr nano::transport::tcp_listener::collect_container_info (std::string const & name) -{ - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) })); - return composite; -} - /* * tcp_server */ diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 8743772c..fdd28f18 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -16,54 +16,6 @@ namespace nano::transport class message_deserializer; class tcp_server; -/** - * Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected. - */ -class tcp_listener final : public std::enable_shared_from_this -{ -public: - tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections); - ~tcp_listener (); - - void start (std::function const &, boost::system::error_code const &)> callback); - void stop (); - - void accept_action (boost::system::error_code const &, std::shared_ptr const &); - - std::size_t connection_count (); - nano::tcp_endpoint endpoint () const; - - std::unique_ptr collect_container_info (std::string const & name); - -private: // Dependencies - nano::node & node; - -private: - void on_connection (std::function const &, boost::system::error_code const &)> callback_a); - void evict_dead_connections (); - void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); - /** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */ - bool limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection); - bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection); - void cleanup (); - -public: - std::atomic bootstrap_count{ 0 }; - std::atomic realtime_count{ 0 }; - -private: - std::unordered_map> connections; - std::multimap> connections_per_address; - - boost::asio::strand strand; - boost::asio::ip::tcp::acceptor acceptor; - boost::asio::ip::tcp::endpoint local; - std::size_t const max_inbound_connections; - - std::atomic stopped; - mutable nano::mutex mutex; -}; - class tcp_server final : public std::enable_shared_from_this { public: diff --git a/nano/test_common/system.cpp b/nano/test_common/system.cpp index 914d472e..4c9b2579 100644 --- a/nano/test_common/system.cpp +++ b/nano/test_common/system.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include