Move tcp_listener to a dedicated file and reduce header coupling

This commit is contained in:
Piotr Wójcik 2024-03-21 21:34:57 +01:00
commit 6768e7dc6e
11 changed files with 365 additions and 333 deletions

View file

@ -5,6 +5,7 @@
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/socket.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/test_common/network.hpp>
#include <nano/test_common/system.hpp>

View file

@ -10,6 +10,7 @@
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/fake.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/vote_generator.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/test_common/network.hpp>

View file

@ -2,6 +2,7 @@
#include <nano/boost/asio/ip/network_v6.hpp>
#include <nano/lib/thread_runner.hpp>
#include <nano/node/transport/socket.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>

View file

@ -158,6 +158,8 @@ add_library(
transport/socket.cpp
transport/tcp.hpp
transport/tcp.cpp
transport/tcp_listener.hpp
transport/tcp_listener.cpp
transport/tcp_server.hpp
transport/tcp_server.cpp
transport/transport.hpp

View file

@ -15,6 +15,7 @@
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/telemetry.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/vote_generator.hpp>
#include <nano/node/websocket.hpp>
#include <nano/secure/ledger.hpp>

View file

@ -47,9 +47,6 @@
namespace nano
{
class active_transactions;
namespace rocksdb
{
} // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice
class node;
class work_pool;
@ -57,7 +54,17 @@ namespace scheduler
{
class component;
}
namespace transport
{
class tcp_listener;
}
namespace rocksdb
{
} // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice
}
namespace nano
{
// Configs
backlog_population::config backlog_population_config (node_config const &);
outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &);

View file

@ -0,0 +1,287 @@
#include <nano/node/messages.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/transport/tcp_server.hpp>
#include <memory>
namespace
{
bool is_temporary_error (boost::system::error_code const & ec_a)
{
switch (ec_a.value ())
{
#if EAGAIN != EWOULDBLOCK
case EAGAIN:
#endif
case EWOULDBLOCK:
case EINTR:
return true;
default:
return false;
}
}
}
/*
* tcp_listener
*/
nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) :
node (node_a),
strand{ node_a.io_ctx.get_executor () },
acceptor{ node_a.io_ctx },
local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } },
max_inbound_connections{ max_inbound_connections }
{
}
nano::transport::tcp_listener::~tcp_listener ()
{
debug_assert (stopped);
}
void nano::transport::tcp_listener::start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
acceptor.open (local.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
boost::system::error_code ec;
acceptor.bind (local, ec);
if (!ec)
{
acceptor.listen (boost::asio::socket_base::max_listen_connections, ec);
}
if (ec)
{
node.logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ec.message (), acceptor.local_endpoint ().port ());
throw std::runtime_error (ec.message ());
}
on_connection (callback_a);
}
void nano::transport::tcp_listener::stop ()
{
decltype (connections) connections_l;
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
connections_l.swap (connections);
}
nano::lock_guard<nano::mutex> lock{ mutex };
boost::asio::dispatch (strand, [this_l = shared_from_this ()] () {
this_l->acceptor.close ();
for (auto & address_connection_pair : this_l->connections_per_address)
{
if (auto connection_l = address_connection_pair.second.lock ())
{
connection_l->close ();
}
}
this_l->connections_per_address.clear ();
});
}
std::size_t nano::transport::tcp_listener::connection_count ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
cleanup ();
return connections.size ();
}
void nano::transport::tcp_listener::cleanup ()
{
debug_assert (!mutex.try_lock ());
erase_if (connections, [] (auto const & connection) {
return connection.second.expired ();
});
}
bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ()))
{
// If the limit is disabled, then it is unreachable.
// If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64.
return false;
}
auto const counted_connections = socket_functions::count_subnetwork_connections (
connections_per_address,
new_connection->remote.address ().to_v6 (),
node.network_params.network.ipv6_subnetwork_prefix_for_limiting);
return counted_connections >= node.network_params.network.max_peers_per_subnetwork;
}
bool nano::transport::tcp_listener::limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_ip)
{
// If the limit is disabled, then it is unreachable.
return false;
}
auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ());
auto const counted_connections = static_cast<std::size_t> (std::abs (std::distance (address_connections_range.first, address_connections_range.second)));
return counted_connections >= node.network_params.network.max_peers_per_ip;
}
void nano::transport::tcp_listener::on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable {
if (!this_l->acceptor.is_open ())
{
this_l->node.logger.error (nano::log::type::tcp_listener, "Acceptor is not open");
return;
}
// Prepare new connection
auto new_connection = std::make_shared<nano::transport::socket> (this_l->node, socket::endpoint_type_t::server);
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable {
this_l->evict_dead_connections ();
if (this_l->connections_per_address.size () >= this_l->max_inbound_connections)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections reached ({}), unable to open new connection", this_l->connections_per_address.size ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (this_l->limit_reached_for_incoming_ip_connections (new_connection))
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection", new_connection->remote_endpoint ().address ().to_string ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection))
{
auto const remote_ip_address = new_connection->remote_endpoint ().address ();
debug_assert (remote_ip_address.is_v6 ());
auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (subnetwork: {}, ip: {}), unable to open new connection",
remote_subnet.canonical ().to_string (),
remote_ip_address.to_string ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (!ec_a)
{
{
// Best effort attempt to get endpoint addresses
boost::system::error_code ec;
new_connection->local = new_connection->tcp_socket.local_endpoint (ec);
}
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
// an IO operation immediately, which will start a timer.
new_connection->start ();
new_connection->set_timeout (this_l->node.network_params.network.idle_timeout);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
this_l->node.observers.socket_accepted.notify (*new_connection);
if (cbk (new_connection, ec_a))
{
this_l->on_connection (std::move (cbk));
return;
}
this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections");
return;
}
// accept error
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->node.logger.error (nano::log::type::tcp_listener, "Unable to accept connection: {} ({})", ec_a.message (), new_connection->remote_endpoint ().address ().to_string ());
if (is_temporary_error (ec_a))
{
// if it is a temporary error, just retry it
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
// if it is not a temporary error, check how the listener wants to handle this error
if (cbk (new_connection, ec_a))
{
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
// No requeue if we reach here, no incoming socket connections will be handled
this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections");
}));
}));
}
// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept.
// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and
// give the rest of the system a chance to recover.
void nano::transport::tcp_listener::on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable {
this_l->on_connection (std::move (callback));
});
}
// This must be called from a strand
void nano::transport::tcp_listener::evict_dead_connections ()
{
debug_assert (strand.running_in_this_thread ());
erase_if (connections_per_address, [] (auto const & entry) {
return entry.second.expired ();
});
}
void nano::transport::tcp_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr<nano::transport::socket> const & socket_a)
{
if (!node.network.excluded_peers.check (socket_a->remote_endpoint ()))
{
auto server = std::make_shared<nano::transport::tcp_server> (socket_a, node.shared (), true);
nano::lock_guard<nano::mutex> lock{ mutex };
connections[server.get ()] = server;
server->start ();
}
else
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_excluded);
node.logger.debug (nano::log::type::tcp_server, "Rejected connection from excluded peer: {}", nano::util::to_str (socket_a->remote_endpoint ()));
}
}
boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (!stopped)
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ());
}
else
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), 0);
}
}
std::unique_ptr<nano::container_info_component> nano::transport::tcp_listener::collect_container_info (std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) }));
return composite;
}

View file

@ -0,0 +1,60 @@
#pragma once
#include <nano/boost/asio/strand.hpp>
#include <nano/node/common.hpp>
#include <atomic>
namespace nano::transport
{
class socket;
class tcp_server;
/**
* Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected.
*/
class tcp_listener final : public std::enable_shared_from_this<tcp_listener>
{
public:
tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections);
~tcp_listener ();
void start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback);
void stop ();
void accept_action (boost::system::error_code const &, std::shared_ptr<nano::transport::socket> const &);
std::size_t connection_count ();
nano::tcp_endpoint endpoint () const;
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
private: // Dependencies
nano::node & node;
private:
void on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a);
void evict_dead_connections ();
void on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const &)>);
/** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */
bool limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
void cleanup ();
public:
std::atomic<std::size_t> bootstrap_count{ 0 };
std::atomic<std::size_t> realtime_count{ 0 };
private:
std::unordered_map<nano::transport::tcp_server *, std::weak_ptr<nano::transport::tcp_server>> connections;
std::multimap<boost::asio::ip::address, std::weak_ptr<socket>> connections_per_address;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
std::size_t const max_inbound_connections;
std::atomic<bool> stopped;
mutable nano::mutex mutex;
};
}

View file

@ -4,292 +4,11 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/message_deserializer.hpp>
#include <nano/node/transport/tcp.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/transport/tcp_server.hpp>
#include <boost/format.hpp>
#include <memory>
namespace
{
bool is_temporary_error (boost::system::error_code const & ec_a)
{
switch (ec_a.value ())
{
#if EAGAIN != EWOULDBLOCK
case EAGAIN:
#endif
case EWOULDBLOCK:
case EINTR:
return true;
default:
return false;
}
}
}
/*
* tcp_listener
*/
nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a, std::size_t max_inbound_connections) :
node (node_a),
strand{ node_a.io_ctx.get_executor () },
acceptor{ node_a.io_ctx },
local{ boost::asio::ip::tcp::endpoint{ boost::asio::ip::address_v6::any (), port_a } },
max_inbound_connections{ max_inbound_connections }
{
}
nano::transport::tcp_listener::~tcp_listener ()
{
debug_assert (stopped);
}
void nano::transport::tcp_listener::start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
acceptor.open (local.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
boost::system::error_code ec;
acceptor.bind (local, ec);
if (!ec)
{
acceptor.listen (boost::asio::socket_base::max_listen_connections, ec);
}
if (ec)
{
node.logger.critical (nano::log::type::tcp_listener, "Error while binding for incoming TCP: {} (port: {})", ec.message (), acceptor.local_endpoint ().port ());
throw std::runtime_error (ec.message ());
}
on_connection (callback_a);
}
void nano::transport::tcp_listener::stop ()
{
decltype (connections) connections_l;
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
connections_l.swap (connections);
}
nano::lock_guard<nano::mutex> lock{ mutex };
boost::asio::dispatch (strand, [this_l = shared_from_this ()] () {
this_l->acceptor.close ();
for (auto & address_connection_pair : this_l->connections_per_address)
{
if (auto connection_l = address_connection_pair.second.lock ())
{
connection_l->close ();
}
}
this_l->connections_per_address.clear ();
});
}
std::size_t nano::transport::tcp_listener::connection_count ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
cleanup ();
return connections.size ();
}
void nano::transport::tcp_listener::cleanup ()
{
debug_assert (!mutex.try_lock ());
erase_if (connections, [] (auto const & connection) {
return connection.second.expired ();
});
}
bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_subnetwork || nano::transport::is_ipv4_or_v4_mapped_address (new_connection->remote.address ()))
{
// If the limit is disabled, then it is unreachable.
// If the address is IPv4 we don't check for a network limit, since its address space isn't big as IPv6 /64.
return false;
}
auto const counted_connections = socket_functions::count_subnetwork_connections (
connections_per_address,
new_connection->remote.address ().to_v6 (),
node.network_params.network.ipv6_subnetwork_prefix_for_limiting);
return counted_connections >= node.network_params.network.max_peers_per_subnetwork;
}
bool nano::transport::tcp_listener::limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
if (node.flags.disable_max_peers_per_ip)
{
// If the limit is disabled, then it is unreachable.
return false;
}
auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ());
auto const counted_connections = static_cast<std::size_t> (std::abs (std::distance (address_connections_range.first, address_connections_range.second)));
return counted_connections >= node.network_params.network.max_peers_per_ip;
}
void nano::transport::tcp_listener::on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable {
if (!this_l->acceptor.is_open ())
{
this_l->node.logger.error (nano::log::type::tcp_listener, "Acceptor is not open");
return;
}
// Prepare new connection
auto new_connection = std::make_shared<nano::transport::socket> (this_l->node, socket::endpoint_type_t::server);
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, cbk = std::move (callback)] (boost::system::error_code const & ec_a) mutable {
this_l->evict_dead_connections ();
if (this_l->connections_per_address.size () >= this_l->max_inbound_connections)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections reached ({}), unable to open new connection", this_l->connections_per_address.size ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (this_l->limit_reached_for_incoming_ip_connections (new_connection))
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per IP reached ({}), unable to open new connection", new_connection->remote_endpoint ().address ().to_string ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection))
{
auto const remote_ip_address = new_connection->remote_endpoint ().address ();
debug_assert (remote_ip_address.is_v6 ());
auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in);
this_l->node.logger.debug (nano::log::type::tcp_listener, "Max connections per subnetwork reached (subnetwork: {}, ip: {}), unable to open new connection",
remote_subnet.canonical ().to_string (),
remote_ip_address.to_string ());
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
if (!ec_a)
{
{
// Best effort attempt to get endpoint addresses
boost::system::error_code ec;
new_connection->local = new_connection->tcp_socket.local_endpoint (ec);
}
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
// an IO operation immediately, which will start a timer.
new_connection->start ();
new_connection->set_timeout (this_l->node.network_params.network.idle_timeout);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
this_l->node.observers.socket_accepted.notify (*new_connection);
if (cbk (new_connection, ec_a))
{
this_l->on_connection (std::move (cbk));
return;
}
this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections");
return;
}
// accept error
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->node.logger.error (nano::log::type::tcp_listener, "Unable to accept connection: {} ({})", ec_a.message (), new_connection->remote_endpoint ().address ().to_string ());
if (is_temporary_error (ec_a))
{
// if it is a temporary error, just retry it
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
// if it is not a temporary error, check how the listener wants to handle this error
if (cbk (new_connection, ec_a))
{
this_l->on_connection_requeue_delayed (std::move (cbk));
return;
}
// No requeue if we reach here, no incoming socket connections will be handled
this_l->node.logger.warn (nano::log::type::tcp_listener, "Stopping to accept new connections");
}));
}));
}
// If we are unable to accept a socket, for any reason, we wait just a little (1ms) before rescheduling the next connection accept.
// The intention is to throttle back the connection requests and break up any busy loops that could possibly form and
// give the rest of the system a chance to recover.
void nano::transport::tcp_listener::on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [this_l = shared_from_this (), callback = std::move (callback_a)] () mutable {
this_l->on_connection (std::move (callback));
});
}
// This must be called from a strand
void nano::transport::tcp_listener::evict_dead_connections ()
{
debug_assert (strand.running_in_this_thread ());
erase_if (connections_per_address, [] (auto const & entry) {
return entry.second.expired ();
});
}
void nano::transport::tcp_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr<nano::transport::socket> const & socket_a)
{
if (!node.network.excluded_peers.check (socket_a->remote_endpoint ()))
{
auto server = std::make_shared<nano::transport::tcp_server> (socket_a, node.shared (), true);
nano::lock_guard<nano::mutex> lock{ mutex };
connections[server.get ()] = server;
server->start ();
}
else
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_excluded);
node.logger.debug (nano::log::type::tcp_server, "Rejected connection from excluded peer: {}", nano::util::to_str (socket_a->remote_endpoint ()));
}
}
boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (!stopped)
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ());
}
else
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), 0);
}
}
std::unique_ptr<nano::container_info_component> nano::transport::tcp_listener::collect_container_info (std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) }));
return composite;
}
/*
* tcp_server
*/

View file

@ -16,54 +16,6 @@ namespace nano::transport
class message_deserializer;
class tcp_server;
/**
* Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected.
*/
class tcp_listener final : public std::enable_shared_from_this<tcp_listener>
{
public:
tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections);
~tcp_listener ();
void start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback);
void stop ();
void accept_action (boost::system::error_code const &, std::shared_ptr<nano::transport::socket> const &);
std::size_t connection_count ();
nano::tcp_endpoint endpoint () const;
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
private: // Dependencies
nano::node & node;
private:
void on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a);
void evict_dead_connections ();
void on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const &)>);
/** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */
bool limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
void cleanup ();
public:
std::atomic<std::size_t> bootstrap_count{ 0 };
std::atomic<std::size_t> realtime_count{ 0 };
private:
std::unordered_map<nano::transport::tcp_server *, std::weak_ptr<nano::transport::tcp_server>> connections;
std::multimap<boost::asio::ip::address, std::weak_ptr<socket>> connections_per_address;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
std::size_t const max_inbound_connections;
std::atomic<bool> stopped;
mutable nano::mutex mutex;
};
class tcp_server final : public std::enable_shared_from_this<tcp_server>
{
public:

View file

@ -2,6 +2,7 @@
#include <nano/lib/blocks.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/common.hpp>
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>