Close connections without holding locks

This commit is contained in:
Piotr Wójcik 2024-11-21 00:34:59 +01:00
commit 0357518b8b
5 changed files with 104 additions and 61 deletions

View file

@ -6,6 +6,7 @@
#include <boost/lexical_cast.hpp>
#include <deque>
#include <functional>
#include <sstream>
#include <vector>
@ -65,6 +66,28 @@ size_t erase_if (Container & container, Pred pred)
return result;
}
/**
* Erase elements from container when predicate returns true and return erased elements as a std::deque
*/
template <class Container, class Pred>
std::deque<typename Container::value_type> erase_if_and_collect (Container & container, Pred pred)
{
std::deque<typename Container::value_type> removed_elements;
for (auto it = container.begin (); it != container.end ();)
{
if (pred (*it))
{
removed_elements.push_back (*it);
it = container.erase (it);
}
else
{
++it;
}
}
return removed_elements;
}
/** Safe narrowing cast which silences warnings and asserts on data loss in debug builds. This is optimized away. */
template <typename TARGET_TYPE, typename SOURCE_TYPE>
constexpr TARGET_TYPE narrow_cast (SOURCE_TYPE const & val)

View file

@ -1,6 +1,8 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp_channels.hpp>
#include <ranges>
/*
* tcp_channels
*/
@ -308,7 +310,7 @@ bool nano::transport::tcp_channels::track_reachout (nano::endpoint const & endpo
void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point cutoff_deadline)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto channels_l = all_channels ();
auto should_close = [this, cutoff_deadline] (auto const & channel) {
// Remove channels that haven't successfully sent a message within the cutoff time
@ -332,27 +334,35 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point
return false;
};
for (auto const & entry : channels)
// Close stale channels without holding the mutex
for (auto const & channel : channels_l)
{
if (should_close (entry.channel))
if (should_close (channel))
{
entry.channel->close ();
channel->close ();
}
}
erase_if (channels, [this] (auto const & entry) {
if (!entry.channel->alive ())
{
node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", entry.channel->to_string ());
entry.channel->close ();
return true; // Erase
}
return false;
});
nano::unique_lock<nano::mutex> lock{ mutex };
// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_deadline));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);
// Erase dead channels from list, but close them outside of the lock
auto erased_connections = erase_if_and_collect (channels, [this] (auto const & entry) {
return !entry.channel->alive ();
});
lock.unlock ();
for (auto const & connection : erased_connections)
{
node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::erase_dead);
node.logger.debug (nano::log::type::tcp_channels, "Removing dead channel: {}", connection.channel->to_string ());
connection.channel->close ();
}
}
void nano::transport::tcp_channels::keepalive ()
@ -437,6 +447,27 @@ bool nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint)
return node.tcp_listener.connect (endpoint.address (), endpoint.port ());
}
auto nano::transport::tcp_channels::all_sockets () const -> std::deque<std::shared_ptr<tcp_socket>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = channels | std::views::transform ([] (auto const & entry) { return entry.socket; });
return { r.begin (), r.end () };
}
auto nano::transport::tcp_channels::all_servers () const -> std::deque<std::shared_ptr<tcp_server>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = channels | std::views::transform ([] (auto const & entry) { return entry.server; });
return { r.begin (), r.end () };
}
auto nano::transport::tcp_channels::all_channels () const -> std::deque<std::shared_ptr<tcp_channel>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = channels | std::views::transform ([] (auto const & entry) { return entry.channel; });
return { r.begin (), r.end () };
}
nano::container_info nano::transport::tcp_channels::container_info () const
{
nano::lock_guard<nano::mutex> guard{ mutex };

View file

@ -62,6 +62,10 @@ public:
// Connection start
bool start_tcp (nano::endpoint const &);
std::deque<std::shared_ptr<tcp_socket>> all_sockets () const;
std::deque<std::shared_ptr<tcp_server>> all_servers () const;
std::deque<std::shared_ptr<tcp_channel>> all_channels () const;
nano::container_info container_info () const;
private: // Dependencies

View file

@ -153,14 +153,8 @@ void nano::transport::tcp_listener::stop ()
for (auto & connection : connections_l)
{
if (auto socket = connection.socket.lock ())
{
socket->close ();
}
if (auto server = connection.server.lock ())
{
server->stop ();
}
connection.socket->close ();
connection.server->stop ();
}
logger.debug (nano::log::type::tcp_listener, "Stopped");
@ -173,35 +167,40 @@ void nano::transport::tcp_listener::run_cleanup ()
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::cleanup);
cleanup ();
timeout ();
purge (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();
condition.wait_for (lock, 1s, [this] () { return stopped.load (); });
}
}
void nano::transport::tcp_listener::cleanup ()
void nano::transport::tcp_listener::purge (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
// Erase dead connections
erase_if (connections, [this] (auto const & connection) {
if (connection.socket.expired () && connection.server.expired ())
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead);
logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint);
return true;
}
else
{
return false;
}
});
// Erase completed attempts
erase_if (attempts, [this] (auto const & attempt) {
return attempt.task.ready ();
});
// Erase dead connections
auto erased_connections = erase_if_and_collect (connections, [this] (auto const & connection) {
return !connection.socket->alive ();
});
lock.unlock ();
for (auto const & connection : erased_connections)
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::erase_dead);
logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint);
connection.socket->close ();
connection.server->stop ();
}
}
void nano::transport::tcp_listener::timeout ()
@ -215,7 +214,7 @@ void nano::transport::tcp_listener::timeout ()
{
if (!attempt.task.ready () && attempt.start < cutoff)
{
attempt.task.cancel ();
attempt.task.cancel (); // Cancel is non-blocking and will return immediately, safe to call under lock
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::attempt_timeout);
logger.debug (nano::log::type::tcp_listener, "Connection attempt timed out: {} (started {}s ago)",
@ -431,7 +430,7 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket
return { accept_result::accepted, socket, server };
}
auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) -> accept_result
auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip, connection_type type) const -> accept_result
{
debug_assert (!mutex.try_lock ());
@ -440,8 +439,6 @@ auto nano::transport::tcp_listener::check_limits (asio::ip::address const & ip,
return accept_result::rejected;
}
cleanup ();
if (node.network.excluded_peers.check (ip)) // true => error
{
stats.inc (nano::stat::type::tcp_listener_rejected, nano::stat::detail::excluded, to_stat_dir (type));
@ -526,11 +523,7 @@ size_t nano::transport::tcp_listener::realtime_count () const
nano::lock_guard<nano::mutex> lock{ mutex };
return std::count_if (connections.begin (), connections.end (), [] (auto const & connection) {
if (auto socket = connection.socket.lock ())
{
return socket->is_realtime_connection ();
}
return false;
return connection.socket->type () == nano::transport::socket_type::realtime;
});
}
@ -539,11 +532,7 @@ size_t nano::transport::tcp_listener::bootstrap_count () const
nano::lock_guard<nano::mutex> lock{ mutex };
return std::count_if (connections.begin (), connections.end (), [] (auto const & connection) {
if (auto socket = connection.socket.lock ())
{
return socket->is_bootstrap_connection ();
}
return false;
return connection.socket->type () == nano::transport::socket_type::bootstrap;
});
}
@ -588,18 +577,14 @@ asio::ip::tcp::endpoint nano::transport::tcp_listener::endpoint () const
auto nano::transport::tcp_listener::all_sockets () const -> std::deque<std::shared_ptr<tcp_socket>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = connections
| std::views::transform ([] (auto const & connection) { return connection.socket.lock (); })
| std::views::filter ([] (auto const & socket) { return socket != nullptr; });
auto r = connections | std::views::transform ([] (auto const & connection) { return connection.socket; });
return { r.begin (), r.end () };
}
auto nano::transport::tcp_listener::all_servers () const -> std::deque<std::shared_ptr<tcp_server>>
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto r = connections
| std::views::transform ([] (auto const & connection) { return connection.server.lock (); })
| std::views::filter ([] (auto const & server) { return server != nullptr; });
auto r = connections | std::views::transform ([] (auto const & connection) { return connection.server; });
return { r.begin (), r.end () };
}

View file

@ -92,7 +92,7 @@ private:
asio::awaitable<void> wait_available_slots () const;
void run_cleanup ();
void cleanup ();
void purge (nano::unique_lock<nano::mutex> &);
void timeout ();
asio::awaitable<void> connect_impl (asio::ip::tcp::endpoint);
@ -106,7 +106,7 @@ private:
};
accept_return accept_one (asio::ip::tcp::socket, connection_type);
accept_result check_limits (asio::ip::address const & ip, connection_type);
accept_result check_limits (asio::ip::address const & ip, connection_type) const;
asio::awaitable<asio::ip::tcp::socket> accept_socket ();
size_t count_per_type (connection_type) const;
@ -119,8 +119,8 @@ private:
{
connection_type type;
asio::ip::tcp::endpoint endpoint;
std::weak_ptr<tcp_socket> socket;
std::weak_ptr<tcp_server> server;
std::shared_ptr<tcp_socket> socket;
std::shared_ptr<tcp_server> server;
asio::ip::address address () const
{