Fix TCP accept handling when max incoming connection limit is reached (#3315)
This fixes the problem where we stop accepting TCP sockets permanently due to reaching the max incoming sockets limit. The symptom of this problem is a large number of CLOSE_WAIT TCP sockets. The approach of this solution is to never refuse to accept. We accept all connections but if the limit is reached, the connection is dropped immediately after being accepted.
This commit is contained in:
parent
1a3857fd4e
commit
17e7687033
3 changed files with 85 additions and 53 deletions
|
@ -22,22 +22,16 @@ void nano::bootstrap_listener::start ()
|
|||
listening_socket->start (ec);
|
||||
if (ec)
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Error while binding for incoming TCP/bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ()));
|
||||
node.logger.always_log (boost::str (boost::format ("Network: Error while binding for incoming TCP/bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ()));
|
||||
throw std::runtime_error (ec.message ());
|
||||
}
|
||||
debug_assert (node.network.endpoint ().port () == listening_socket->listening_port ());
|
||||
listening_socket->on_connection ([this] (std::shared_ptr<nano::socket> const & new_connection, boost::system::error_code const & ec_a) {
|
||||
bool keep_accepting = true;
|
||||
if (ec_a)
|
||||
{
|
||||
keep_accepting = false;
|
||||
this->node.logger.try_log (boost::str (boost::format ("Error while accepting incoming TCP/bootstrap connections: %1%") % ec_a.message ()));
|
||||
}
|
||||
else
|
||||
if (!ec_a)
|
||||
{
|
||||
accept_action (ec_a, new_connection);
|
||||
}
|
||||
return keep_accepting;
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -243,55 +243,91 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
|
|||
auto this_l (std::static_pointer_cast<nano::server_socket> (shared_from_this ()));
|
||||
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, callback_a] () {
|
||||
if (this_l->acceptor.is_open ())
|
||||
if (!this_l->acceptor.is_open ())
|
||||
{
|
||||
if (this_l->connections.size () < this_l->max_inbound_connections)
|
||||
{
|
||||
// Prepare new connection
|
||||
auto new_connection = std::make_shared<nano::socket> (this_l->node, boost::none);
|
||||
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
[this_l, new_connection, callback_a] (boost::system::error_code const & ec_a) {
|
||||
this_l->evict_dead_connections ();
|
||||
if (this_l->connections.size () < this_l->max_inbound_connections)
|
||||
{
|
||||
if (!ec_a)
|
||||
{
|
||||
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
|
||||
// an IO operation immediately, which will start a timer.
|
||||
new_connection->checkup ();
|
||||
new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.node.idle_timeout);
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
|
||||
this_l->connections.push_back (new_connection);
|
||||
}
|
||||
else
|
||||
{
|
||||
this_l->node.logger.try_log ("Unable to accept connection: ", ec_a.message ());
|
||||
}
|
||||
|
||||
// If the callback returns true, keep accepting new connections
|
||||
if (callback_a (new_connection, ec_a))
|
||||
{
|
||||
this_l->on_connection (callback_a);
|
||||
}
|
||||
else
|
||||
{
|
||||
this_l->node.logger.try_log ("Stopping to accept connections");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
|
||||
boost::asio::post (this_l->strand, boost::asio::bind_executor (this_l->strand, [this_l, callback_a] () {
|
||||
this_l->on_connection (callback_a);
|
||||
}));
|
||||
}
|
||||
}));
|
||||
}
|
||||
this_l->node.logger.always_log ("Network: Acceptor is not open");
|
||||
return;
|
||||
}
|
||||
|
||||
// Prepare new connection
|
||||
auto new_connection = std::make_shared<nano::socket> (this_l->node, boost::none);
|
||||
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
[this_l, new_connection, callback_a] (boost::system::error_code const & ec_a) {
|
||||
this_l->evict_dead_connections ();
|
||||
|
||||
if (this_l->connections.size () >= this_l->max_inbound_connections)
|
||||
{
|
||||
this_l->node.logger.always_log ("Network: max_inbound_connections reached, unable to open new connection");
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
|
||||
this_l->on_connection_requeue_delayed (callback_a);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!ec_a)
|
||||
{
|
||||
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
|
||||
// an IO operation immediately, which will start a timer.
|
||||
new_connection->checkup ();
|
||||
new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.node.idle_timeout);
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
|
||||
this_l->connections.push_back (new_connection);
|
||||
if (callback_a (new_connection, ec_a))
|
||||
{
|
||||
this_l->on_connection (callback_a);
|
||||
return;
|
||||
}
|
||||
this_l->node.logger.always_log ("Network: Stopping to accept connections");
|
||||
return;
|
||||
}
|
||||
|
||||
// accept error
|
||||
this_l->node.logger.try_log ("Network: Unable to accept connection: ", ec_a.message ());
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
|
||||
|
||||
if (this_l->is_temporary_error (ec_a))
|
||||
{
|
||||
// if it is a temporary error, just retry it
|
||||
this_l->on_connection_requeue_delayed (callback_a);
|
||||
return;
|
||||
}
|
||||
|
||||
// if it is not a temporary error, check how the listener wants to handle this error
|
||||
if (callback_a (new_connection, ec_a))
|
||||
{
|
||||
this_l->on_connection_requeue_delayed (callback_a);
|
||||
return;
|
||||
}
|
||||
|
||||
// No requeue if we reach here, no incoming socket connections will be handled
|
||||
this_l->node.logger.always_log ("Network: Stopping to accept connections");
|
||||
}));
|
||||
}));
|
||||
}
|
||||
|
||||
void nano::server_socket::on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::socket> const &, boost::system::error_code const &)> callback_a)
|
||||
{
|
||||
auto this_l (std::static_pointer_cast<nano::server_socket> (shared_from_this ()));
|
||||
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l, callback_a] () {
|
||||
this_l->on_connection (callback_a);
|
||||
});
|
||||
}
|
||||
|
||||
bool nano::server_socket::is_temporary_error (boost::system::error_code const ec_a)
|
||||
{
|
||||
switch (ec_a.value ())
|
||||
{
|
||||
#if EAGAIN != EWOULDBLOCK
|
||||
case EAGAIN:
|
||||
#endif
|
||||
case EWOULDBLOCK:
|
||||
case EINTR:
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// This must be called from a strand
|
||||
void nano::server_socket::evict_dead_connections ()
|
||||
{
|
||||
|
|
|
@ -125,5 +125,7 @@ private:
|
|||
boost::asio::ip::tcp::endpoint local;
|
||||
size_t max_inbound_connections;
|
||||
void evict_dead_connections ();
|
||||
bool is_temporary_error (boost::system::error_code const ec_a);
|
||||
void on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::socket> const & new_connection, boost::system::error_code const &)>);
|
||||
};
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue