Merge pull request #4919 from pwojcikdev/tcp-listener-handshake-timeout

Node handshake timeout
This commit is contained in:
Piotr Wójcik 2025-07-12 11:25:14 +02:00 committed by GitHub
commit a7610bd084
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 162 additions and 145 deletions

View file

@ -1083,8 +1083,8 @@ TEST (network, purge_dead_channel)
auto channel = channels.front ();
ASSERT_TRUE (channel);
auto sockets = node1.tcp_listener.sockets ();
ASSERT_EQ (sockets.size (), 1);
std::deque<std::shared_ptr<nano::transport::tcp_socket>> sockets;
ASSERT_TIMELY_EQ (5s, (sockets = node1.tcp_listener.all_sockets ()).size (), 1);
auto socket = sockets.front ();
ASSERT_TRUE (socket);
@ -1133,8 +1133,8 @@ TEST (network, purge_dead_channel_remote)
auto channel = channels.front ();
ASSERT_TRUE (channel);
auto sockets = node1.tcp_listener.sockets ();
ASSERT_EQ (sockets.size (), 1);
std::deque<std::shared_ptr<nano::transport::tcp_socket>> sockets;
ASSERT_TIMELY_EQ (5s, (sockets = node1.tcp_listener.all_sockets ()).size (), 1);
auto socket = sockets.front ();
ASSERT_TRUE (socket);

View file

@ -25,14 +25,13 @@ TEST (tcp_listener, max_connections)
node_config.tcp.max_inbound_connections = 2;
auto node = system.add_node (node_config, node_flags);
// client side connection tracking
std::atomic<size_t> connection_attempts = 0;
auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) {
ASSERT_EQ (ec_a.value (), 0);
++connection_attempts;
};
// start 3 clients, 2 will persist but 1 will be dropped
// Start 3 clients, 2 should connect successfully
auto client1 = std::make_shared<nano::transport::tcp_socket> (*node);
client1->async_connect (node->network.endpoint (), connect_handler);
@ -45,46 +44,8 @@ TEST (tcp_listener, max_connections)
ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 2);
ASSERT_ALWAYS_EQ (1s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 2);
ASSERT_TIMELY_EQ (5s, connection_attempts, 3);
// create space for one socket and fill the connections table again
{
auto sockets1 = node->tcp_listener.sockets ();
ASSERT_EQ (sockets1.size (), 2);
sockets1[0]->close ();
}
ASSERT_TIMELY_EQ (10s, node->tcp_listener.sockets ().size (), 1);
auto client4 = std::make_shared<nano::transport::tcp_socket> (*node);
client4->async_connect (node->network.endpoint (), connect_handler);
auto client5 = std::make_shared<nano::transport::tcp_socket> (*node);
client5->async_connect (node->network.endpoint (), connect_handler);
ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 3);
ASSERT_ALWAYS_EQ (1s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 3);
ASSERT_TIMELY_EQ (5s, connection_attempts, 5);
// close all existing sockets and fill the connections table again
{
auto sockets2 = node->tcp_listener.sockets ();
ASSERT_EQ (sockets2.size (), 2);
sockets2[0]->close ();
sockets2[1]->close ();
}
ASSERT_TIMELY_EQ (10s, node->tcp_listener.sockets ().size (), 0);
auto client6 = std::make_shared<nano::transport::tcp_socket> (*node);
client6->async_connect (node->network.endpoint (), connect_handler);
auto client7 = std::make_shared<nano::transport::tcp_socket> (*node);
client7->async_connect (node->network.endpoint (), connect_handler);
auto client8 = std::make_shared<nano::transport::tcp_socket> (*node);
client8->async_connect (node->network.endpoint (), connect_handler);
ASSERT_TIMELY_EQ (5s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 5);
ASSERT_ALWAYS_EQ (1s, node->stats.count (nano::stat::type::tcp_listener, nano::stat::detail::accept_success), 5);
ASSERT_TIMELY_EQ (5s, connection_attempts, 8); // connections initiated by the client
ASSERT_TIMELY_EQ (5s, node->tcp_listener.all_sockets ().size (), 2);
ASSERT_ALWAYS_EQ (1s, node->tcp_listener.all_sockets ().size (), 2);
}
TEST (tcp_listener, max_connections_per_ip)
@ -206,7 +167,7 @@ TEST (tcp_listener, max_peers_per_ip)
ASSERT_TIMELY_EQ (5s, connection_attempts, max_ip_connections + 1);
}
TEST (tcp_listener, tcp_node_id_handshake)
TEST (tcp_listener, node_id_handshake)
{
nano::test::system system (1);
auto socket (std::make_shared<nano::transport::tcp_socket> (*system.nodes[0]));
@ -240,33 +201,26 @@ TEST (tcp_listener, tcp_node_id_handshake)
ASSERT_TIMELY (5s, done);
}
// Test disabled because it's failing intermittently.
// PR in which it got disabled: https://github.com/nanocurrency/nano-node/pull/3611
// Issue for investigating it: https://github.com/nanocurrency/nano-node/issues/3615
TEST (tcp_listener, DISABLED_tcp_listener_timeout_empty)
TEST (tcp_listener, timeout_empty)
{
nano::test::system system (1);
auto node0 (system.nodes[0]);
nano::test::system system;
nano::node_config config;
config.tcp.handshake_timeout = 2s;
auto node0 = system.add_node (config);
auto socket (std::make_shared<nano::transport::tcp_socket> (*node0));
std::atomic<bool> connected (false);
socket->async_connect (node0->tcp_listener.endpoint (), [&connected] (boost::system::error_code const & ec) {
socket->async_connect (node0->tcp_listener.endpoint (), [] (boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
connected = true;
});
ASSERT_TIMELY (5s, connected);
bool disconnected (false);
system.deadline_set (std::chrono::seconds (6));
while (!disconnected)
{
disconnected = node0->tcp_listener.connection_count () == 0;
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY_EQ (5s, node0->tcp_listener.connection_count (), 1);
ASSERT_TIMELY_EQ (10s, node0->tcp_listener.connection_count (), 0);
}
TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
TEST (tcp_listener, timeout_node_id_handshake)
{
nano::test::system system (1);
auto node0 (system.nodes[0]);
nano::test::system system;
nano::node_config config;
config.tcp.handshake_timeout = 2s;
auto node0 = system.add_node (config);
auto socket (std::make_shared<nano::transport::tcp_socket> (*node0));
auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->tcp_listener.endpoint ())));
ASSERT_TRUE (cookie);
@ -280,12 +234,6 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
});
});
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0);
ASSERT_EQ (node0->tcp_listener.connection_count (), 1);
bool disconnected (false);
system.deadline_set (std::chrono::seconds (20));
while (!disconnected)
{
disconnected = node0->tcp_listener.connection_count () == 0;
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY_EQ (5s, node0->tcp_listener.connection_count (), 1);
ASSERT_TIMELY_EQ (10s, node0->tcp_listener.connection_count (), 0);
}

View file

@ -383,6 +383,7 @@ enum class detail
connect_rejected,
connect_success,
attempt_timeout,
handshake_timeout,
not_a_peer,
// tcp_channel

View file

@ -6,6 +6,7 @@
#include <boost/lexical_cast.hpp>
#include <deque>
#include <functional>
#include <sstream>
#include <vector>
@ -65,6 +66,28 @@ size_t erase_if (Container & container, Pred pred)
return result;
}
/**
* Erase elements from container when predicate returns true and return erased elements as a std::deque
*/
template <class Container, class Pred>
std::deque<typename Container::value_type> erase_if_and_collect (Container & container, Pred pred)
{
std::deque<typename Container::value_type> removed_elements;
for (auto it = container.begin (); it != container.end ();)
{
if (pred (*it))
{
removed_elements.push_back (*it);
it = container.erase (it);
}
else
{
++it;
}
}
return removed_elements;
}
/** Safe narrowing cast which silences warnings and asserts on data loss in debug builds. This is optimized away. */
template <typename TARGET_TYPE, typename SOURCE_TYPE>
constexpr TARGET_TYPE narrow_cast (SOURCE_TYPE const & val)

View file

@ -1,6 +1,8 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp_channels.hpp>
#include <ranges>
/*
* tcp_channels
*/
@ -308,7 +310,7 @@ bool nano::transport::tcp_channels::track_reachout (nano::endpoint const & endpo
void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point cutoff_deadline)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto channels_l = all_channels ();
auto should_close = [this, cutoff_deadline] (auto const & channel) {
// Remove channels that haven't successfully sent a message within the cutoff time
@ -332,27 +334,35 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point
return false;
};
for (auto const & entry : channels)
// Close stale channels without holding the mutex
for (auto const & channel : channels_l)
{
if (should_close (entry.channel))
if (should_close (channel))
{
entry.channel->close ();
channel->close ();
}
}
erase_if (channels, [this] (auto const & entry) {
if (!entry.channel->alive ())
{
node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ());
entry.channel->close ();
return true; // Erase
}
return false;
});
nano::unique_lock<nano::mutex> lock{ mutex };
// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_deadline));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);
// Erase dead channels from list, but close them outside of the lock
auto erased_connections = erase_if_and_collect (channels, [this] (auto const & entry) {
return !entry.channel->alive ();
});
lock.unlock ();
for (auto const & connection : erased_connections)
{
node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::erase_dead);
node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", connection.channel->to_string ());
connection.channel->close ();
}
}
void nano::transport::tcp_channels::keepalive ()
@ -437,6 +447,27 @@ bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint)
return node.tcp_listener.connect (endpoint.address (), endpoint.port ());
}
auto nano::transport::tcp_channels::all_sockets () const -> std::deque<std::shared_ptr<tcp_socket>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = channels | std::views::transform ([] (auto const & entry) { return entry.socket; });
return { r.begin (), r.end () };
}
auto nano::transport::tcp_channels::all_servers () const -> std::deque<std::shared_ptr<tcp_server>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = channels | std::views::transform ([] (auto const & entry) { return entry.server; });
return { r.begin (), r.end () };
}
auto nano::transport::tcp_channels::all_channels () const -> std::deque<std::shared_ptr<tcp_channel>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = channels | std::views::transform ([] (auto const & entry) { return entry.channel; });
return { r.begin (), r.end () };
}
nano::container_info nano::transport::tcp_channels::container_info () const
{
nano::lock_guard<nano::mutex> guard{ mutex };

View file

@ -62,6 +62,10 @@ public:
// Connection start
bool start_tcp (nano::endpoint const &);
std::deque<std::shared_ptr<tcp_socket>> all_sockets () const;
std::deque<std::shared_ptr<tcp_server>> all_servers () const;
std::deque<std::shared_ptr<tcp_channel>> all_channels () const;
nano::container_info container_info () const;
private: // Dependencies

View file

@ -153,14 +153,8 @@ void nano::transport::tcp_listener::stop ()
for (auto & connection : connections_l)
{
if (auto socket = connection.socket.lock ())
{
socket->close ();
}
if (auto server = connection.server.lock ())
{
server->stop ();
}
connection.socket->close ();
connection.server->stop ();
}
logger.debug (nano::log::type::tcp_listener, "Stopped");
@ -173,53 +167,75 @@ void nano::transport::tcp_listener::run_cleanup ()
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup);
cleanup ();
timeout ();
purge (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();
condition.wait_for (lock, 1s, [this] () { return stopped.load (); });
}
}
void nano::transport::tcp_listener::cleanup ()
void nano::transport::tcp_listener::purge (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
// Erase dead connections
erase_if (connections, [this] (auto const & connection) {
if (connection.socket.expired () && connection.server.expired ())
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead);
logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint);
return true;
}
else
{
return false;
}
});
// Erase completed attempts
erase_if (attempts, [this] (auto const & attempt) {
return attempt.task.ready ();
});
// Erase dead connections
auto erased_connections = erase_if_and_collect (connections, [this] (auto const & connection) {
return !connection.socket->alive ();
});
lock.unlock ();
for (auto const & connection : erased_connections)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead);
logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint);
connection.socket->close ();
connection.server->stop ();
}
}
void nano::transport::tcp_listener::timeout ()
{
debug_assert (!mutex.try_lock ());
auto const cutoff = std::chrono::steady_clock::now () - config.connect_timeout;
auto const now = std::chrono::steady_clock::now ();
auto const connect_cutoff = now - config.connect_timeout;
auto const handshake_cutoff = now - config.handshake_timeout;
// Cancel timed out attempts
for (auto & attempt : attempts)
{
if (!attempt.task.ready () && attempt.start < cutoff)
if (!attempt.task.ready () && attempt.start < connect_cutoff)
{
attempt.task.cancel ();
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout);
logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)",
attempt.endpoint, nano::log::seconds_delta (attempt.start));
attempt.endpoint,
nano::log::seconds_delta (attempt.start));
attempt.task.cancel (); // Cancel is non-blocking and will return immediately, safe to call under lock
}
}
// Cancel too long handshakes
for (auto & connection : connections)
{
if (connection.socket->type () == nano::transport::socket_type::undefined && connection.socket->get_time_connected () < handshake_cutoff)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::handshake_timeout);
logger.debug (nano::log::type::tcp_listener, "Handshake timed out: {} (connected {}s ago)",
connection.endpoint,
nano::log::seconds_delta (connection.socket->get_time_connected ()));
connection.socket->close (); // Schedule socket close, this is non-blocking, safe to call under lock
}
}
}
@ -390,7 +406,7 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket
if (auto result = check_limits (remote_endpoint.address (), type); result != accept_result::accepted)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_rejected, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Rejected connection from: {} reason: {} ({})",
logger.debug (nano::log::type::tcp_listener, "Rejected connection: {} reason: {} ({})",
remote_endpoint,
to_string (result),
to_string (type));
@ -431,7 +447,7 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket
return { accept_result::accepted, socket, server };
}
auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) -> accept_result
auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) const -> accept_result
{
debug_assert (!mutex.try_lock ());
@ -440,8 +456,6 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip,
return accept_result::rejected;
}
cleanup ();
if (node.network.excluded_peers.check (ip)) // true => error
{
stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded, to_stat_dir (type));
@ -526,11 +540,7 @@ size_t nano::transport::tcp_listener::realtime_count () const
nano::lock_guard<nano::mutex> lock{ mutex };
return std::count_if (connections.begin (), connections.end (), [] (auto const & connection) {
if (auto socket = connection.socket.lock ())
{
return socket->is_realtime_connection ();
}
return false;
return connection.socket->type () == nano::transport::socket_type::realtime;
});
}
@ -539,11 +549,7 @@ size_t nano::transport::tcp_listener::bootstrap_count () const
nano::lock_guard<nano::mutex> lock{ mutex };
return std::count_if (connections.begin (), connections.end (), [] (auto const & connection) {
if (auto socket = connection.socket.lock ())
{
return socket->is_bootstrap_connection ();
}
return false;
return connection.socket->type () == nano::transport::socket_type::bootstrap;
});
}
@ -585,21 +591,17 @@ asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const
return { asio::ip::address_v6::loopback (), local.port () };
}
auto nano::transport::tcp_listener::sockets () const -> std::vector<std::shared_ptr<tcp_socket>>
auto nano::transport::tcp_listener::all_sockets () const -> std::deque<std::shared_ptr<tcp_socket>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = connections
| std::views::transform ([] (auto const & connection) { return connection.socket.lock (); })
| std::views::filter ([] (auto const & socket) { return socket != nullptr; });
auto r = connections | std::views::transform ([] (auto const & connection) { return connection.socket; });
return { r.begin (), r.end () };
}
auto nano::transport::tcp_listener::servers () const -> std::vector<std::shared_ptr<tcp_server>>
auto nano::transport::tcp_listener::all_servers () const -> std::deque<std::shared_ptr<tcp_server>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = connections
| std::views::transform ([] (auto const & connection) { return connection.server.lock (); })
| std::views::filter ([] (auto const & server) { return server != nullptr; });
auto r = connections | std::views::transform ([] (auto const & connection) { return connection.server; });
return { r.begin (), r.end () };
}

View file

@ -71,8 +71,8 @@ public:
size_t realtime_count () const;
size_t bootstrap_count () const;
std::vector<std::shared_ptr<tcp_socket>> sockets () const;
std::vector<std::shared_ptr<tcp_server>> servers () const;
std::deque<std::shared_ptr<tcp_socket>> all_sockets () const;
std::deque<std::shared_ptr<tcp_server>> all_servers () const;
nano::container_info container_info () const;
@ -92,7 +92,7 @@ private:
asio::awaitable<void> wait_available_slots () const;
void run_cleanup ();
void cleanup ();
void purge (nano::unique_lock<nano::mutex> &);
void timeout ();
asio::awaitable<void> connect_impl (asio::ip::tcp::endpoint);
@ -106,7 +106,7 @@ private:
};
accept_return accept_one (asio::ip::tcp::socket, connection_type);
accept_result check_limits (asio::ip::address const & ip, connection_type);
accept_result check_limits (asio::ip::address const & ip, connection_type) const;
asio::awaitable<asio::ip::tcp::socket> accept_socket ();
size_t count_per_type (connection_type) const;
@ -119,8 +119,8 @@ private:
{
connection_type type;
asio::ip::tcp::endpoint endpoint;
std::weak_ptr<tcp_socket> socket;
std::weak_ptr<tcp_server> server;
std::shared_ptr<tcp_socket> socket;
std::shared_ptr<tcp_server> server;
asio::ip::address address () const
{

View file

@ -36,6 +36,7 @@ nano::transport::tcp_socket::tcp_socket (nano::node & node_a, boost::asio::ip::t
default_timeout{ node_a.config.tcp_io_timeout },
silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time }
{
time_connected = std::chrono::steady_clock::now ();
}
nano::transport::tcp_socket::~tcp_socket ()
@ -77,6 +78,7 @@ void nano::transport::tcp_socket::async_connect (nano::tcp_endpoint const & endp
}
else
{
this_l->time_connected = std::chrono::steady_clock::now ();
this_l->set_last_completion ();
{
// Best effort attempt to get endpoint address

View file

@ -140,6 +140,10 @@ public:
{
return !is_closed ();
}
std::chrono::steady_clock::time_point get_time_connected () const
{
return time_connected.load ();
}
private:
size_t const queue_size;
@ -188,6 +192,8 @@ protected:
/** Updated only from strand, but stored as atomic so it can be read from outside */
std::atomic<bool> write_in_progress{ false };
std::atomic<std::chrono::steady_clock::time_point> time_connected;
void close_internal ();
void write_queued_messages ();
void set_default_timeout ();