Reorganize tcp_listener

This commit is contained in:
Piotr Wójcik 2024-03-19 11:00:11 +01:00
commit 970b048052
4 changed files with 60 additions and 48 deletions

View file

@ -698,10 +698,7 @@ TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty)
system.deadline_set (std::chrono::seconds (6));
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
disconnected = node0->tcp_listener->connection_count () == 0;
ASSERT_NO_ERROR (system.poll ());
}
}
@ -723,18 +720,12 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
});
});
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0);
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
ASSERT_EQ (node0->tcp_listener->connections.size (), 1);
}
ASSERT_EQ (node0->tcp_listener->connection_count (), 1);
bool disconnected (false);
system.deadline_set (std::chrono::seconds (20));
while (!disconnected)
{
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
disconnected = node0->tcp_listener->connections.empty ();
}
disconnected = node0->tcp_listener->connection_count () == 0;
ASSERT_NO_ERROR (system.poll ());
}
}

View file

@ -535,7 +535,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.ledger, "ledger"));
composite->add_component (collect_container_info (node.active, "active"));
composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator"));
composite->add_component (collect_container_info (*node.tcp_listener, "tcp_listener"));
composite->add_component (node.tcp_listener->collect_container_info ("tcp_listener"));
composite->add_component (collect_container_info (node.network, "network"));
composite->add_component (node.telemetry.collect_container_info ("telemetry"));
composite->add_component (collect_container_info (node.workers, "workers"));

View file

@ -42,10 +42,15 @@ nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_
{
}
nano::transport::tcp_listener::~tcp_listener ()
{
debug_assert (stopped);
}
void nano::transport::tcp_listener::start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
on = true;
acceptor.open (local.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
boost::system::error_code ec;
@ -68,12 +73,14 @@ void nano::transport::tcp_listener::stop ()
decltype (connections) connections_l;
{
nano::lock_guard<nano::mutex> lock{ mutex };
on = false;
stopped = true;
connections_l.swap (connections);
}
nano::lock_guard<nano::mutex> lock{ mutex };
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l = shared_from_this ()] () {
boost::asio::dispatch (strand, [this_l = shared_from_this ()] () {
this_l->acceptor.close ();
for (auto & address_connection_pair : this_l->connections_per_address)
{
if (auto connection_l = address_connection_pair.second.lock ())
@ -82,15 +89,25 @@ void nano::transport::tcp_listener::stop ()
}
}
this_l->connections_per_address.clear ();
}));
});
}
std::size_t nano::transport::tcp_listener::connection_count ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
cleanup ();
return connections.size ();
}
void nano::transport::tcp_listener::cleanup ()
{
debug_assert (!mutex.try_lock ());
erase_if (connections, [] (auto const & connection) {
return connection.second.expired ();
});
}
bool nano::transport::tcp_listener::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection)
{
debug_assert (strand.running_in_this_thread ());
@ -253,10 +270,10 @@ void nano::transport::tcp_listener::accept_action (boost::system::error_code con
}
}
boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint ()
boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (on)
if (!stopped)
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), acceptor.local_endpoint ().port ());
}
@ -266,11 +283,10 @@ boost::asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint ()
}
}
std::unique_ptr<nano::container_info_component> nano::transport::collect_container_info (nano::transport::tcp_listener & bootstrap_listener, std::string const & name)
std::unique_ptr<nano::container_info_component> nano::transport::tcp_listener::collect_container_info (std::string const & name)
{
auto sizeof_element = sizeof (decltype (bootstrap_listener.connections)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", bootstrap_listener.connection_count (), sizeof_element }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "connections", connection_count (), sizeof (decltype (connections)::value_type) }));
return composite;
}
@ -321,9 +337,6 @@ nano::transport::tcp_server::~tcp_server ()
}
stop ();
nano::lock_guard<nano::mutex> lock{ node->tcp_listener->mutex };
node->tcp_listener->connections.erase (this);
}
void nano::transport::tcp_server::start ()
@ -840,10 +853,6 @@ void nano::transport::tcp_server::timeout ()
{
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint));
{
nano::lock_guard<nano::mutex> lock{ node->tcp_listener->mutex };
node->tcp_listener->connections.erase (this);
}
socket->close ();
}
}

View file

@ -19,38 +19,50 @@ class tcp_server;
/**
* Server side portion of bootstrap sessions. Listens for new socket connections and spawns tcp_server objects when connected.
*/
class tcp_listener final : public std::enable_shared_from_this<nano::transport::tcp_listener>
class tcp_listener final : public std::enable_shared_from_this<tcp_listener>
{
public:
tcp_listener (uint16_t, nano::node &, std::size_t);
void start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a);
void stop ();
void accept_action (boost::system::error_code const &, std::shared_ptr<nano::transport::socket> const &);
std::size_t connection_count ();
tcp_listener (uint16_t port, nano::node &, std::size_t max_inbound_connections);
~tcp_listener ();
nano::mutex mutex;
std::unordered_map<nano::transport::tcp_server *, std::weak_ptr<nano::transport::tcp_server>> connections;
nano::tcp_endpoint endpoint ();
void start (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback);
void stop ();
void accept_action (boost::system::error_code const &, std::shared_ptr<nano::transport::socket> const &);
std::size_t connection_count ();
nano::tcp_endpoint endpoint () const;
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
private: // Dependencies
nano::node & node;
bool on{ false };
std::atomic<std::size_t> bootstrap_count{ 0 };
std::atomic<std::size_t> realtime_count{ 0 };
private:
boost::asio::strand<boost::asio::io_context::executor_type> strand;
nano::transport::address_socket_mmap connections_per_address;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
std::size_t max_inbound_connections;
void on_connection (std::function<bool (std::shared_ptr<nano::transport::socket> const &, boost::system::error_code const &)> callback_a);
void evict_dead_connections ();
void on_connection_requeue_delayed (std::function<bool (std::shared_ptr<nano::transport::socket> const & new_connection, boost::system::error_code const &)>);
/** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */
bool limit_reached_for_incoming_ip_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr<nano::transport::socket> const & new_connection);
};
void cleanup ();
std::unique_ptr<container_info_component> collect_container_info (tcp_listener & bootstrap_listener, std::string const & name);
public:
std::atomic<std::size_t> bootstrap_count{ 0 };
std::atomic<std::size_t> realtime_count{ 0 };
private:
std::unordered_map<nano::transport::tcp_server *, std::weak_ptr<nano::transport::tcp_server>> connections;
std::multimap<boost::asio::ip::address, std::weak_ptr<socket>> connections_per_address;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
std::size_t const max_inbound_connections;
std::atomic<bool> stopped;
mutable nano::mutex mutex;
};
class tcp_server final : public std::enable_shared_from_this<tcp_server>
{