From 17e7687033f0c57bb25e2dc6c6808dbba2945250 Mon Sep 17 00:00:00 2001 From: dsiganos Date: Wed, 2 Jun 2021 14:45:20 +0100 Subject: [PATCH] Fix TCP accept handling when max incoming connection limit is reached (#3315) This fixes the problem where we stop accepting TCP sockets permanently due to reaching the max incoming sockets limit. The symptom of this problem is a large number of CLOSE_WAIT TCP sockets. The approach of this solution is to never refuse to accept. We accept all connections but if the limit is reached, the connection is dropped immediately after being accepted. --- nano/node/bootstrap/bootstrap_server.cpp | 12 +-- nano/node/socket.cpp | 124 +++++++++++++++-------- nano/node/socket.hpp | 2 + 3 files changed, 85 insertions(+), 53 deletions(-) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 9e244a69..4af45810 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -22,22 +22,16 @@ void nano::bootstrap_listener::start () listening_socket->start (ec); if (ec) { - node.logger.try_log (boost::str (boost::format ("Error while binding for incoming TCP/bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ())); + node.logger.always_log (boost::str (boost::format ("Network: Error while binding for incoming TCP/bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ())); throw std::runtime_error (ec.message ()); } debug_assert (node.network.endpoint ().port () == listening_socket->listening_port ()); listening_socket->on_connection ([this] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { - bool keep_accepting = true; - if (ec_a) - { - keep_accepting = false; - this->node.logger.try_log (boost::str (boost::format ("Error while accepting incoming TCP/bootstrap connections: %1%") % ec_a.message ())); - } - else + if (!ec_a) { accept_action (ec_a, new_connection); } - return keep_accepting; + return true; }); } diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index ec43f000..2b7da426 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -243,55 +243,91 @@ void nano::server_socket::on_connection (std::function (shared_from_this ())); boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, callback_a] () { - if (this_l->acceptor.is_open ()) + if (!this_l->acceptor.is_open ()) { - if (this_l->connections.size () < this_l->max_inbound_connections) - { - // Prepare new connection - auto new_connection = std::make_shared (this_l->node, boost::none); - this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, - boost::asio::bind_executor (this_l->strand, - [this_l, new_connection, callback_a] (boost::system::error_code const & ec_a) { - this_l->evict_dead_connections (); - if (this_l->connections.size () < this_l->max_inbound_connections) - { - if (!ec_a) - { - // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start - // an IO operation immediately, which will start a timer. - new_connection->checkup (); - new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.node.idle_timeout); - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); - this_l->connections.push_back (new_connection); - } - else - { - this_l->node.logger.try_log ("Unable to accept connection: ", ec_a.message ()); - } - - // If the callback returns true, keep accepting new connections - if (callback_a (new_connection, ec_a)) - { - this_l->on_connection (callback_a); - } - else - { - this_l->node.logger.try_log ("Stopping to accept connections"); - } - } - else - { - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); - boost::asio::post (this_l->strand, boost::asio::bind_executor (this_l->strand, [this_l, callback_a] () { - this_l->on_connection (callback_a); - })); - } - })); - } + this_l->node.logger.always_log ("Network: Acceptor is not open"); + return; } + + // Prepare new connection + auto new_connection = std::make_shared (this_l->node, boost::none); + this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, + boost::asio::bind_executor (this_l->strand, + [this_l, new_connection, callback_a] (boost::system::error_code const & ec_a) { + this_l->evict_dead_connections (); + + if (this_l->connections.size () >= this_l->max_inbound_connections) + { + this_l->node.logger.always_log ("Network: max_inbound_connections reached, unable to open new connection"); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + this_l->on_connection_requeue_delayed (callback_a); + return; + } + + if (!ec_a) + { + // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start + // an IO operation immediately, which will start a timer. + new_connection->checkup (); + new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.node.idle_timeout); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); + this_l->connections.push_back (new_connection); + if (callback_a (new_connection, ec_a)) + { + this_l->on_connection (callback_a); + return; + } + this_l->node.logger.always_log ("Network: Stopping to accept connections"); + return; + } + + // accept error + this_l->node.logger.try_log ("Network: Unable to accept connection: ", ec_a.message ()); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + + if (this_l->is_temporary_error (ec_a)) + { + // if it is a temporary error, just retry it + this_l->on_connection_requeue_delayed (callback_a); + return; + } + + // if it is not a temporary error, check how the listener wants to handle this error + if (callback_a (new_connection, ec_a)) + { + this_l->on_connection_requeue_delayed (callback_a); + return; + } + + // No requeue if we reach here, no incoming socket connections will be handled + this_l->node.logger.always_log ("Network: Stopping to accept connections"); + })); })); } +void nano::server_socket::on_connection_requeue_delayed (std::function const &, boost::system::error_code const &)> callback_a) +{ + auto this_l (std::static_pointer_cast (shared_from_this ())); + node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l, callback_a] () { + this_l->on_connection (callback_a); + }); +} + +bool nano::server_socket::is_temporary_error (boost::system::error_code const ec_a) +{ + switch (ec_a.value ()) + { +#if EAGAIN != EWOULDBLOCK + case EAGAIN: +#endif + case EWOULDBLOCK: + case EINTR: + return true; + default: + return false; + } +} + // This must be called from a strand void nano::server_socket::evict_dead_connections () { diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index 70c002b1..95fd68b5 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -125,5 +125,7 @@ private: boost::asio::ip::tcp::endpoint local; size_t max_inbound_connections; void evict_dead_connections (); + bool is_temporary_error (boost::system::error_code const ec_a); + void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); }; }