Merge pull request #4515 from pwojcikdev/networking-fixes/tcp-listener-reorg

Reorganize `tcp_listener`
This commit is contained in:
Piotr Wójcik 2024-03-22 09:42:55 +01:00 committed by GitHub
commit 8281cdbbaa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 417 additions and 348 deletions

View file

@ -5,6 +5,7 @@
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/socket.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/test_common/network.hpp>
#include <nano/test_common/system.hpp>
@ -698,10 +699,7 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty)
system.deadline_set (std::chrono::seconds (6));
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
disconnected = node0->tcp_listener->connection_count () == 0;
ASSERT_NO_ERROR (system.poll ());
}
}
@ -723,18 +721,12 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
});
});
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0);
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
ASSERT_EQ (node0->tcp_listener->connections.size (), 1);
}
ASSERT_EQ (node0->tcp_listener->connection_count (), 1);
bool disconnected (false);
system.deadline_set (std::chrono::seconds (20));
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
disconnected = node0->tcp_listener->connection_count () == 0;
ASSERT_NO_ERROR (system.poll ());
}
}

View file

@ -10,6 +10,7 @@
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/fake.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/vote_generator.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/test_common/network.hpp>

View file

@ -2,6 +2,7 @@
#include <nano/boost/asio/ip/network_v6.hpp>
#include <nano/lib/thread_runner.hpp>
#include <nano/node/transport/socket.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
@ -29,6 +30,7 @@ TEST (socket, max_connections)
// start a server socket that allows max 2 live connections
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, 2);
nano::test::stop_guard stop_guard{ *listener };
listener->start ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
@ -123,6 +125,7 @@ TEST (socket, max_connections_per_ip)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, max_global_connections);
nano::test::stop_guard stop_guard{ *listener };
listener->start ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
@ -243,6 +246,7 @@ TEST (socket, max_connections_per_subnetwork)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, max_global_connections);
nano::test::stop_guard stop_guard{ *listener };
listener->start ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
@ -303,6 +307,7 @@ TEST (socket, disabled_max_peers_per_ip)
std::vector<std::shared_ptr<nano::transport::socket>> server_sockets;
auto server_socket = std::make_shared<nano::transport::tcp_listener> (server_port, *node, max_global_connections);
nano::test::stop_guard stop_guard{ *server_socket };
server_socket->start ([&server_sockets] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_sockets.push_back (new_connection);
return true;
@ -363,6 +368,7 @@ TEST (socket, disconnection_of_silent_connections)
// start a server listening socket
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, 1);
nano::test::stop_guard stop_guard{ *listener };
listener->start ([&server_data_socket] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
server_data_socket = new_connection;
return true;
@ -414,6 +420,7 @@ TEST (socket, drop_policy)
auto server_port (system.get_available_port ());
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, 1);
nano::test::stop_guard stop_guard{ *listener };
listener->start ([&connections] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
connections.push_back (new_connection);
return true;
@ -502,6 +509,7 @@ TEST (socket, concurrent_writes)
std::vector<std::shared_ptr<nano::transport::socket>> connections;
auto listener = std::make_shared<nano::transport::tcp_listener> (server_port, *node, max_connections);
nano::test::stop_guard stop_guard{ *listener };
listener->start ([&connections, &reader] (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const & ec_a) {
if (ec_a)
{

View file

@ -158,6 +158,8 @@ add_library(
transport/socket.cpp
transport/tcp.hpp
transport/tcp.cpp
transport/tcp_listener.hpp
transport/tcp_listener.cpp
transport/tcp_server.hpp
transport/tcp_server.cpp
transport/transport.hpp

View file

@ -15,6 +15,7 @@
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/telemetry.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/vote_generator.hpp>
#include <nano/node/websocket.hpp>
#include <nano/secure/ledger.hpp>
@ -535,7 +536,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.ledger, "ledger"));
composite->add_component (collect_container_info (node.active, "active"));
composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator"));
composite->add_component (collect_container_info (*node.tcp_listener, "tcp_listener"));
composite->add_component (node.tcp_listener->collect_container_info ("tcp_listener"));
composite->add_component (collect_container_info (node.network, "network"));
composite->add_component (node.telemetry.collect_container_info ("telemetry"));
composite->add_component (collect_container_info (node.workers, "workers"));

View file

@ -47,9 +47,6 @@
namespace nano
{
class active_transactions;
namespace rocksdb
{
} // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice
class node;
class work_pool;
@ -57,7 +54,17 @@ namespace scheduler
{
class component;
}
namespace transport
{
class tcp_listener;
}
namespace rocksdb
{
} // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice
}
namespace nano
{
// Configs
backlog_population::config backlog_population_config (node_config const &);
outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &);

View file

@ -0,0 +1,287 @@
#include <nano/node/messages.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/transport/tcp_server.hpp>
#include <memory>
namespace
{
bool is_temporary_error (boost::system::error_code const & ec_a)
{
switch (ec_a.value ())
{
#if EAGAIN != EWOULDBLOCK
case EAGAIN:
#endif
case EWOULDBLOCK:
case EINTR:
return true;
default:
return false;
}
}
}
/*
* tcp_listener
*/
nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) :
node (node_a),
strand{ node_a.io_ctx.get_executor () },
acceptor{ node_a.io_ctx },
local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } },
max_inbound_connections{ max_inbound_connections }
{
}
nano::transport::tcp_listener::~tcp_listener ()
{
debug_assert (stopped);
}
void nano::transport::tcp_listener::start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
acceptor.open (local.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
boost::system::error_code ec;
acceptor.bind (local, ec);
if (!ec)
{
acceptor.listen (boost::asio::socket_base::max_listen_connections, ec);
}
if (ec)
{
node.logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ec.message (), acceptor.local_endpoint ().port ());
throw std::runtime_error (ec.message ());
}
on_connection (callback_a);
}
void nano::transport::tcp_listener::stop ()
{
decltype (connections) connections_l;
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
connections_l.swap (connections);
}
nano::lock_guard<nano::mutex> lock{ mutex };
boost::asio::dispatch (strand, [this_l = shared_from_this ()] () {
this_l->acceptor.close ();
for (auto & address_connection_pair : this_l->connections_per_address)
{
if (auto connection_l = address_connection_pair.second.lock ())
{
connection_l->close ();
}
}
this_l->connections_per_address.clear ();
});
}
std::size_t nano::transport::tcp_listener::connection_count ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
cleanup ();
return connections.size ();
}
void nano::transport::tcp_listener::cleanup ()
{
debug_assert (!mutex.try_lock ());
erase_if (connections, [] (auto const & connection) {
return connection.second.expired ();
});
}
bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ()))
{
// If the limit is disabled, then it is unreachable.
// If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64.
return false;
}
auto const counted_connections = socket_functions::count_subnetwork_connections (
connections_per_address,
new_connection->remote.address ().to_v6 (),
node.network_params.network.ipv6_subnetwork_prefix_for_limiting);
return counted_connections >= node.network_params.network.max_peers_per_subnetwork;
}
bool nano::transport::tcp_listener::limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_ip)
{
// If the limit is disabled, then it is unreachable.
return false;
}
auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ());
auto const counted_connections = static_cast<std::size_t> (std::abs (std::distance (address_connections_range.first, address_connections_range.second)));
return counted_connections >= node.network_params.network.max_peers_per_ip;
}
void nano::transport::tcp_listener::on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable {
if (!this_l->acceptor.is_open ())
{
this_l->node.logger.error (nano::log::type::tcp_listener, "Acceptor is not open");
return;
}
// Prepare new connection
auto new_connection = std::make_shared<nano::transport::socket> (this_l->node, socket::endpoint_type_t::server);
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable {
this_l->evict_dead_connections ();
if (this_l->connections_per_address.size () >= this_l->max_inbound_connections)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections reached ({}), unable to open new connection", this_l->connections_per_address.size ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (this_l->limit_reached_for_incoming_ip_connections (new_connection))
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection", new_connection->remote_endpoint ().address ().to_string ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection))
{
auto const remote_ip_address = new_connection->remote_endpoint ().address ();
debug_assert (remote_ip_address.is_v6 ());
auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (subnetwork: {}, ip: {}), unable to open new connection",
remote_subnet.canonical ().to_string (),
remote_ip_address.to_string ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (!ec_a)
{
{
// Best effort attempt to get endpoint addresses
boost::system::error_code ec;
new_connection->local = new_connection->tcp_socket.local_endpoint (ec);
}
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
// an IO operation immediately, which will start a timer.
new_connection->start ();
new_connection->set_timeout (this_l->node.network_params.network.idle_timeout);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
this_l->node.observers.socket_accepted.notify (*new_connection);
if (cbk (new_connection, ec_a))
{
this_l->on_connection (std::move (cbk));
return;
}
this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections");
return;
}
// accept error
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->node.logger.error (nano::log::type::tcp_listener, "Unable to accept connection: {} ({})", ec_a.message (), new_connection->remote_endpoint ().address ().to_string ());
if (is_temporary_error (ec_a))
{
// if it is a temporary error, just retry it
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
// if it is not a temporary error, check how the listener wants to handle this error
if (cbk (new_connection, ec_a))
{
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
// No requeue if we reach here, no incoming socket connections will be handled
this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections");
}));
}));
}
// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept.
// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and
// give the rest of the system a chance to recover.
void nano::transport::tcp_listener::on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable {
this_l->on_connection (std::move (callback));
});
}
// This must be called from a strand
void nano::transport::tcp_listener::evict_dead_connections ()
{
debug_assert (strand.running_in_this_thread ());
erase_if (connections_per_address, [] (auto const & entry) {
return entry.second.expired ();
});
}
void nano::transport::tcp_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr<nano::transport::socket> const & socket_a)
{
if (!node.network.excluded_peers.check (socket_a->remote_endpoint ()))
{
auto server = std::make_shared<nano::transport::tcp_server> (socket_a, node.shared (), true);
nano::lock_guard<nano::mutex> lock{ mutex };
connections[server.get ()] = server;
server->start ();
}
else
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_excluded);
node.logger.debug (nano::log::type::tcp_server, "Rejected connection from excluded peer: {}", nano::util::to_str (socket_a->remote_endpoint ()));
}
}
boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (!stopped)
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ());
}
else
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), 0);
}
}
std::unique_ptr<nano::container_info_component> nano::transport::tcp_listener::collect_container_info (std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) }));
return composite;
}

View file

@ -0,0 +1,60 @@
#pragma once
#include <nano/boost/asio/strand.hpp>
#include <nano/node/common.hpp>
#include <atomic>
namespace nano::transport
{
class socket;
class tcp_server;
/**
* Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected.
*/
class tcp_listener final : public std::enable_shared_from_this<tcp_listener>
{
public:
tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections);
~tcp_listener ();
void start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback);
void stop ();
void accept_action (boost::system::error_code const &, std::shared_ptr<nano::transport::socket> const &);
std::size_t connection_count ();
nano::tcp_endpoint endpoint () const;
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
private: // Dependencies
nano::node & node;
private:
void on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a);
void evict_dead_connections ();
void on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const &)>);
/** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */
bool limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
void cleanup ();
public:
std::atomic<std::size_t> bootstrap_count{ 0 };
std::atomic<std::size_t> realtime_count{ 0 };
private:
std::unordered_map<nano::transport::tcp_server *, std::weak_ptr<nano::transport::tcp_server>> connections;
std::multimap<boost::asio::ip::address, std::weak_ptr<socket>> connections_per_address;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
std::size_t const max_inbound_connections;
std::atomic<bool> stopped;
mutable nano::mutex mutex;
};
}

View file

@ -4,276 +4,11 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/message_deserializer.hpp>
#include <nano/node/transport/tcp.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/transport/tcp_server.hpp>
#include <boost/format.hpp>
#include <memory>
namespace
{
bool is_temporary_error (boost::system::error_code const & ec_a)
{
switch (ec_a.value ())
{
#if EAGAIN != EWOULDBLOCK
case EAGAIN:
#endif
case EWOULDBLOCK:
case EINTR:
return true;
default:
return false;
}
}
}
/*
* tcp_listener
*/
nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) :
node (node_a),
strand{ node_a.io_ctx.get_executor () },
acceptor{ node_a.io_ctx },
local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } },
max_inbound_connections{ max_inbound_connections }
{
}
void nano::transport::tcp_listener::start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
on = true;
acceptor.open (local.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
boost::system::error_code ec;
acceptor.bind (local, ec);
if (!ec)
{
acceptor.listen (boost::asio::socket_base::max_listen_connections, ec);
}
if (ec)
{
node.logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ec.message (), acceptor.local_endpoint ().port ());
throw std::runtime_error (ec.message ());
}
on_connection (callback_a);
}
void nano::transport::tcp_listener::stop ()
{
decltype (connections) connections_l;
{
nano::lock_guard<nano::mutex> lock{ mutex };
on = false;
connections_l.swap (connections);
}
nano::lock_guard<nano::mutex> lock{ mutex };
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this ()] () {
this_l->acceptor.close ();
for (auto & address_connection_pair : this_l->connections_per_address)
{
if (auto connection_l = address_connection_pair.second.lock ())
{
connection_l->close ();
}
}
this_l->connections_per_address.clear ();
}));
}
std::size_t nano::transport::tcp_listener::connection_count ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
return connections.size ();
}
bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ()))
{
// If the limit is disabled, then it is unreachable.
// If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64.
return false;
}
auto const counted_connections = socket_functions::count_subnetwork_connections (
connections_per_address,
new_connection->remote.address ().to_v6 (),
node.network_params.network.ipv6_subnetwork_prefix_for_limiting);
return counted_connections >= node.network_params.network.max_peers_per_subnetwork;
}
bool nano::transport::tcp_listener::limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_ip)
{
// If the limit is disabled, then it is unreachable.
return false;
}
auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ());
auto const counted_connections = static_cast<std::size_t> (std::abs (std::distance (address_connections_range.first, address_connections_range.second)));
return counted_connections >= node.network_params.network.max_peers_per_ip;
}
void nano::transport::tcp_listener::on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable {
if (!this_l->acceptor.is_open ())
{
this_l->node.logger.error (nano::log::type::tcp_listener, "Acceptor is not open");
return;
}
// Prepare new connection
auto new_connection = std::make_shared<nano::transport::socket> (this_l->node, socket::endpoint_type_t::server);
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable {
this_l->evict_dead_connections ();
if (this_l->connections_per_address.size () >= this_l->max_inbound_connections)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections reached ({}), unable to open new connection", this_l->connections_per_address.size ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (this_l->limit_reached_for_incoming_ip_connections (new_connection))
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection", new_connection->remote_endpoint ().address ().to_string ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection))
{
auto const remote_ip_address = new_connection->remote_endpoint ().address ();
debug_assert (remote_ip_address.is_v6 ());
auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (subnetwork: {}, ip: {}), unable to open new connection",
remote_subnet.canonical ().to_string (),
remote_ip_address.to_string ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (!ec_a)
{
{
// Best effort attempt to get endpoint addresses
boost::system::error_code ec;
new_connection->local = new_connection->tcp_socket.local_endpoint (ec);
}
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
// an IO operation immediately, which will start a timer.
new_connection->start ();
new_connection->set_timeout (this_l->node.network_params.network.idle_timeout);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
this_l->node.observers.socket_accepted.notify (*new_connection);
if (cbk (new_connection, ec_a))
{
this_l->on_connection (std::move (cbk));
return;
}
this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections");
return;
}
// accept error
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->node.logger.error (nano::log::type::tcp_listener, "Unable to accept connection: {} ({})", ec_a.message (), new_connection->remote_endpoint ().address ().to_string ());
if (is_temporary_error (ec_a))
{
// if it is a temporary error, just retry it
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
// if it is not a temporary error, check how the listener wants to handle this error
if (cbk (new_connection, ec_a))
{
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
// No requeue if we reach here, no incoming socket connections will be handled
this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections");
}));
}));
}
// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept.
// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and
// give the rest of the system a chance to recover.
void nano::transport::tcp_listener::on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable {
this_l->on_connection (std::move (callback));
});
}
// This must be called from a strand
void nano::transport::tcp_listener::evict_dead_connections ()
{
debug_assert (strand.running_in_this_thread ());
erase_if (connections_per_address, [] (auto const & entry) {
return entry.second.expired ();
});
}
void nano::transport::tcp_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr<nano::transport::socket> const & socket_a)
{
if (!node.network.excluded_peers.check (socket_a->remote_endpoint ()))
{
auto server = std::make_shared<nano::transport::tcp_server> (socket_a, node.shared (), true);
nano::lock_guard<nano::mutex> lock{ mutex };
connections[server.get ()] = server;
server->start ();
}
else
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_excluded);
node.logger.debug (nano::log::type::tcp_server, "Rejected connection from excluded peer: {}", nano::util::to_str (socket_a->remote_endpoint ()));
}
}
boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (on)
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ());
}
else
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), 0);
}
}
std::unique_ptr<nano::container_info_component> nano::transport::collect_container_info (nano::transport::tcp_listener & bootstrap_listener, std::string const & name)
{
auto sizeof_element = sizeof (decltype (bootstrap_listener.connections)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", bootstrap_listener.connection_count (), sizeof_element }));
return composite;
}
/*
* tcp_server
*/
@ -321,9 +56,6 @@ nano::transport::tcp_server::~tcp_server ()
}
stop ();
nano::lock_guard<nano::mutex> lock{ node->tcp_listener->mutex };
node->tcp_listener->connections.erase (this);
}
void nano::transport::tcp_server::start ()
@ -840,10 +572,6 @@ void nano::transport::tcp_server::timeout ()
{
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint));
{
nano::lock_guard<nano::mutex> lock{ node->tcp_listener->mutex };
node->tcp_listener->connections.erase (this);
}
socket->close ();
}
}

View file

@ -16,42 +16,6 @@ namespace nano::transport
class message_deserializer;
class tcp_server;
/**
* Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected.
*/
class tcp_listener final : public std::enable_shared_from_this<nano::transport::tcp_listener>
{
public:
tcp_listener (uint16_t, nano::node &, std::size_t);
void start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a);
void stop ();
void accept_action (boost::system::error_code const &, std::shared_ptr<nano::transport::socket> const &);
std::size_t connection_count ();
nano::mutex mutex;
std::unordered_map<nano::transport::tcp_server *, std::weak_ptr<nano::transport::tcp_server>> connections;
nano::tcp_endpoint endpoint ();
nano::node & node;
bool on{ false };
std::atomic<std::size_t> bootstrap_count{ 0 };
std::atomic<std::size_t> realtime_count{ 0 };
private:
boost::asio::strand<boost::asio::io_context::executor_type> strand;
nano::transport::address_socket_mmap connections_per_address;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
std::size_t max_inbound_connections;
void on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a);
void evict_dead_connections ();
void on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const &)>);
/** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */
bool limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
};
std::unique_ptr<container_info_component> collect_container_info (tcp_listener & bootstrap_listener, std::string const & name);
class tcp_server final : public std::enable_shared_from_this<tcp_server>
{
public:

View file

@ -2,6 +2,7 @@
#include <nano/lib/blocks.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/common.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>

View file

@ -115,6 +115,46 @@
ASSERT_FALSE (condition); \
}
namespace nano::test
{
template <class... Ts>
class start_stop_guard
{
public:
explicit start_stop_guard (Ts &... refs_a) :
refs{ std::forward<Ts &> (refs_a)... }
{
std::apply ([] (Ts &... refs) { (refs.start (), ...); }, refs);
}
~start_stop_guard ()
{
std::apply ([] (Ts &... refs) { (refs.stop (), ...); }, refs);
}
private:
std::tuple<Ts &...> refs;
};
template <class... Ts>
class stop_guard
{
public:
explicit stop_guard (Ts &... refs_a) :
refs{ std::forward<Ts &> (refs_a)... }
{
}
~stop_guard ()
{
std::apply ([] (Ts &... refs) { (refs.stop (), ...); }, refs);
}
private:
std::tuple<Ts &...> refs;
};
}
/* Convenience globals for gtest projects */
namespace nano
{
@ -233,28 +273,6 @@ namespace test
std::atomic<unsigned> required_count;
};
/**
* A helper that calls `start` from constructor and `stop` from destructor
*/
template <class T>
class start_stop_guard
{
public:
explicit start_stop_guard (T & ref_a) :
ref{ ref_a }
{
ref.start ();
}
~start_stop_guard ()
{
ref.stop ();
}
private:
T & ref;
};
void wait_peer_connections (nano::test::system &);
/**
@ -408,4 +426,4 @@ namespace test
*/
void print_all_blocks (nano::node & node);
}
}
}