Improved removal of dead network channels (#3993)

This commit is contained in:
Piotr Wójcik 2022-11-10 18:06:37 +01:00 committed by GitHub
commit eb8c1aaff9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 257 additions and 41 deletions

View file

@ -1,4 +1,6 @@
#include <nano/node/network.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/node/socket.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/test_common/network.hpp>
@ -1412,3 +1414,147 @@ TEST (network, fill_keepalive_self)
system.nodes[0]->network.fill_keepalive_self (target);
ASSERT_TRUE (target[2].port () == system.nodes[1]->network.port);
}
/*
* Tests that channel and channel container removes channels with dead local sockets
*/
TEST (network, purge_dead_channel_outgoing)
{
nano::test::system system{};
nano::node_flags flags;
// Disable non realtime sockets
flags.disable_bootstrap_bulk_push_client = true;
flags.disable_bootstrap_bulk_pull_server = true;
flags.disable_bootstrap_listener = true;
flags.disable_lazy_bootstrap = true;
flags.disable_legacy_bootstrap = true;
flags.disable_wallet_bootstrap = true;
auto & node1 = *system.add_node (flags);
// We expect one incoming and one outgoing connection
std::shared_ptr<nano::socket> outgoing;
std::shared_ptr<nano::socket> incoming;
std::atomic<int> connected_count{ 0 };
node1.observers.socket_connected.add ([&] (nano::socket & socket) {
connected_count++;
outgoing = socket.shared_from_this ();
std::cout << "connected: " << socket.remote_endpoint () << std::endl;
});
std::atomic<int> accepted_count{ 0 };
node1.observers.socket_accepted.add ([&] (nano::socket & socket) {
accepted_count++;
incoming = socket.shared_from_this ();
std::cout << "accepted: " << socket.remote_endpoint () << std::endl;
});
auto & node2 = *system.add_node (flags);
ASSERT_TIMELY_EQ (5s, connected_count, 1);
ASSERT_ALWAYS_EQ (1s, connected_count, 1);
ASSERT_TIMELY_EQ (5s, accepted_count, 1);
ASSERT_ALWAYS_EQ (1s, accepted_count, 1);
ASSERT_EQ (node1.network.size (), 1);
ASSERT_ALWAYS_EQ (1s, node1.network.size (), 1);
// Store reference to the only channel
auto channels = node1.network.list ();
ASSERT_EQ (channels.size (), 1);
auto channel = channels.front ();
ASSERT_TRUE (channel);
// When socket is dead ensure channel knows about that
ASSERT_TRUE (channel->alive ());
outgoing->close ();
ASSERT_TIMELY (5s, !channel->alive ());
// Shortly after that a new channel should be established
ASSERT_TIMELY_EQ (5s, connected_count, 2);
ASSERT_ALWAYS_EQ (1s, connected_count, 2);
// Check that a new channel is healthy
auto channels2 = node1.network.list ();
ASSERT_EQ (channels2.size (), 1);
auto channel2 = channels2.front ();
ASSERT_TRUE (channel2);
ASSERT_TRUE (channel2->alive ());
}
/*
* Tests that channel and channel container removes channels with dead remote sockets
*/
TEST (network, purge_dead_channel_incoming)
{
nano::test::system system{};
nano::node_flags flags;
// Disable non realtime sockets
flags.disable_bootstrap_bulk_push_client = true;
flags.disable_bootstrap_bulk_pull_server = true;
flags.disable_bootstrap_listener = true;
flags.disable_lazy_bootstrap = true;
flags.disable_legacy_bootstrap = true;
flags.disable_wallet_bootstrap = true;
auto & node1 = *system.add_node (flags);
// We expect one incoming and one outgoing connection
std::shared_ptr<nano::socket> outgoing;
std::shared_ptr<nano::socket> incoming;
std::atomic<int> connected_count{ 0 };
node1.observers.socket_connected.add ([&] (nano::socket & socket) {
connected_count++;
outgoing = socket.shared_from_this ();
std::cout << "connected: " << socket.remote_endpoint () << std::endl;
});
std::atomic<int> accepted_count{ 0 };
node1.observers.socket_accepted.add ([&] (nano::socket & socket) {
accepted_count++;
incoming = socket.shared_from_this ();
std::cout << "accepted: " << socket.remote_endpoint () << std::endl;
});
auto & node2 = *system.add_node (flags);
ASSERT_TIMELY_EQ (5s, connected_count, 1);
ASSERT_ALWAYS_EQ (1s, connected_count, 1);
ASSERT_TIMELY_EQ (5s, accepted_count, 1);
ASSERT_ALWAYS_EQ (1s, accepted_count, 1);
ASSERT_EQ (node2.network.size (), 1);
ASSERT_ALWAYS_EQ (1s, node2.network.size (), 1);
// Store reference to the only channel
auto channels = node2.network.list ();
ASSERT_EQ (channels.size (), 1);
auto channel = channels.front ();
ASSERT_TRUE (channel);
// When remote socket is dead ensure channel knows about that
ASSERT_TRUE (channel->alive ());
incoming->close ();
ASSERT_TIMELY (5s, !channel->alive ());
// Shortly after that a new channel should be established
ASSERT_TIMELY_EQ (5s, accepted_count, 2);
ASSERT_ALWAYS_EQ (1s, accepted_count, 2);
// Check that a new channel is healthy
auto channels2 = node2.network.list ();
ASSERT_EQ (channels2.size (), 1);
auto channel2 = channels2.front ();
ASSERT_TRUE (channel2);
ASSERT_TRUE (channel2->alive ());
}

View file

@ -1851,7 +1851,7 @@ TEST (node, rep_remove)
ASSERT_EQ (*channel_rep1, reps[0].channel_ref ());
// When rep1 disconnects then rep1 should not be found anymore
channel_rep1->disconnect ();
channel_rep1->close ();
ASSERT_TIMELY (5s, searching_node.rep_crawler.representative_count () == 0);
// Add working node for genesis representative
@ -1884,7 +1884,7 @@ TEST (node, rep_remove)
// Now only genesisRep should be found:
reps = searching_node.rep_crawler.representatives (1);
ASSERT_EQ (nano::dev::genesis_key.pub, reps[0].account);
ASSERT_EQ (1, searching_node.network.size ());
ASSERT_TIMELY_EQ (5s, searching_node.network.size (), 1);
auto list (searching_node.network.list (1));
ASSERT_EQ (node_genesis_rep->network.endpoint (), list[0]->get_endpoint ());
}
@ -3202,19 +3202,17 @@ TEST (node, peers)
auto list2 (node2->network.list (2));
ASSERT_EQ (node1->get_node_id (), list2[0]->get_node_id ());
ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ());
// Uncontactable peer should not be stored
ASSERT_TIMELY_EQ (5s, store.peer.count (store.tx_begin_read ()), 1);
ASSERT_TRUE (store.peer.exists (store.tx_begin_read (), endpoint_key));
// Stop the peer node and check that it is removed from the store
node1->stop ();
ASSERT_TIMELY (10s, node2->network.size () != 1);
ASSERT_TRUE (node2->network.empty ());
// Uncontactable peer should not be stored
auto transaction (store.tx_begin_read ());
ASSERT_EQ (store.peer.count (transaction), 1);
ASSERT_TRUE (store.peer.exists (transaction, endpoint_key));
node2->stop ();
// TODO: In `tcp_channels::store_all` we skip store operation when there are no peers present,
// so the best we can do here is check if network is empty
ASSERT_TIMELY (10s, node2->network.empty ());
}
TEST (node, peer_cache_restart)

View file

@ -203,6 +203,7 @@ public:
default_websocket_port (47000),
request_interval_ms (500),
cleanup_period (default_cleanup_period),
keepalive_period (std::chrono::seconds (15)),
idle_timeout (default_cleanup_period * 2),
silent_connection_tolerance_time (std::chrono::seconds (120)),
syn_cookie_cutoff (std::chrono::seconds (5)),
@ -237,6 +238,7 @@ public:
{
request_interval_ms = 20;
cleanup_period = std::chrono::seconds (1);
keepalive_period = std::chrono::seconds (1);
idle_timeout = cleanup_period * 15;
max_peers_per_ip = 20;
max_peers_per_subnetwork = max_peers_per_ip * 4;
@ -267,6 +269,8 @@ public:
{
return cleanup_period * 5;
}
/** How often to send keepalive messages */
std::chrono::seconds keepalive_period;
/** Default maximum idle time for a socket before it's automatically closed */
std::chrono::seconds idle_timeout;
std::chrono::seconds silent_connection_tolerance_time;

View file

@ -624,7 +624,7 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (std::
tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a);
udp_channels.list (result, minimum_version_a);
nano::random_pool_shuffle (result.begin (), result.end ());
if (result.size () > count_a)
if (count_a > 0 && result.size () > count_a)
{
result.resize (count_a, nullptr);
}
@ -769,7 +769,7 @@ void nano::network::ongoing_cleanup ()
{
cleanup (std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff ());
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.cleanup_period, [node_w] () {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->network.ongoing_cleanup ();
@ -794,7 +794,7 @@ void nano::network::ongoing_keepalive ()
flood_keepalive (0.75f);
flood_keepalive_self (0.25f);
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.cleanup_period_half (), [node_w] () {
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->network.ongoing_keepalive ();

View file

@ -22,6 +22,9 @@ public:
nano::observer_set<> disconnect;
nano::observer_set<nano::root const &> work_cancel;
nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> telemetry;
nano::observer_set<nano::socket &> socket_connected;
nano::observer_set<nano::socket &> socket_accepted;
};
std::unique_ptr<container_info_component> collect_container_info (node_observers & node_observers, std::string const & name);

View file

@ -88,8 +88,6 @@ public:
/** Default maximum incoming TCP connections, including realtime network & bootstrap */
unsigned tcp_incoming_connections_max{ 2048 };
bool use_memory_pools{ true };
static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60);
static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5;
static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5);
/** Default outbound traffic shaping is 10MB/s */
std::size_t bandwidth_limit{ 10 * 1024 * 1024 };

View file

@ -252,14 +252,17 @@ nano::uint128_t nano::rep_crawler::total_weight () const
nano::uint128_t result (0);
for (auto i (probable_reps.get<tag_weight> ().begin ()), n (probable_reps.get<tag_weight> ().end ()); i != n; ++i)
{
auto weight (i->weight.number ());
if (weight > 0)
if (i->channel->alive ())
{
result = result + weight;
}
else
{
break;
auto weight (i->weight.number ());
if (weight > 0)
{
result = result + weight;
}
else
{
break;
}
}
}
return result;
@ -292,7 +295,7 @@ void nano::rep_crawler::cleanup_reps ()
auto iterator (probable_reps.get<tag_last_request> ().begin ());
while (iterator != probable_reps.get<tag_last_request> ().end ())
{
if (iterator->channel->get_tcp_endpoint ().address () != boost::asio::ip::address_v6::any ())
if (iterator->channel->alive ())
{
channels.push_back (iterator->channel);
++iterator;

View file

@ -55,28 +55,35 @@ nano::socket::~socket ()
void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function<void (boost::system::error_code const &)> callback_a)
{
debug_assert (callback_a);
debug_assert (endpoint_type () == endpoint_type_t::client);
checkup ();
auto this_l (shared_from_this ());
set_default_timeout ();
this_l->tcp_socket.async_connect (endpoint_a,
boost::asio::bind_executor (this_l->strand,
[this_l, callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) {
if (ec)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error, nano::stat::dir::in);
this_l->close ();
}
else
{
this_l->set_last_completion ();
}
this_l->remote = endpoint_a;
this_l->node.observers.socket_connected.notify (*this_l);
callback (ec);
}));
}
void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> const & buffer_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
{
debug_assert (callback_a);
if (size_a <= buffer_a->size ())
{
auto this_l (shared_from_this ());
@ -90,6 +97,7 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> const & buf
if (ec)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
this_l->close ();
}
else
{
@ -147,6 +155,7 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:
if (ec)
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_l->close ();
}
else
{
@ -192,9 +201,16 @@ void nano::socket::set_last_receive_time ()
void nano::socket::checkup ()
{
std::weak_ptr<nano::socket> this_w (shared_from_this ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (2), [this_w] () {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [this_w] () {
if (auto this_l = this_w.lock ())
{
// If the socket is already dead, close just in case, and stop doing checkups
if (!this_l->alive ())
{
this_l->close ();
return;
}
uint64_t now (nano::seconds_since_epoch ());
auto condition_to_disconnect{ false };
@ -217,13 +233,7 @@ void nano::socket::checkup ()
{
if (this_l->node.config.logging.network_timeout_logging ())
{
// The remote end may have closed the connection before this side timing out, in which case the remote address is no longer available.
boost::system::error_code ec_remote_l;
boost::asio::ip::tcp::endpoint remote_endpoint_l = this_l->tcp_socket.remote_endpoint (ec_remote_l);
if (!ec_remote_l)
{
this_l->node.logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % remote_endpoint_l));
}
this_l->node.logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % this_l->remote));
}
this_l->timed_out = true;
this_l->close ();
@ -459,6 +469,7 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
new_connection->set_timeout (this_l->node.network_params.network.idle_timeout);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
this_l->node.observers.socket_accepted.notify (*new_connection);
if (cbk (new_connection, ec_a))
{
this_l->on_connection (std::move (cbk));

View file

@ -94,18 +94,22 @@ public:
{
return endpoint_type_m;
}
bool is_realtime_connection ()
bool is_realtime_connection () const
{
return type () == nano::socket::type_t::realtime || type () == nano::socket::type_t::realtime_response_server;
}
bool is_bootstrap_connection ()
bool is_bootstrap_connection () const
{
return type () == nano::socket::type_t::bootstrap;
}
bool is_closed ()
bool is_closed () const
{
return closed;
}
bool alive () const
{
return !closed && tcp_socket.is_open ();
}
protected:
/** Holds the buffer and callback for queued writes */

View file

@ -8,7 +8,7 @@ namespace transport
{
/**
* Fake channel that connects to nothing and allows its attributes to be manipulated. Mostly useful for unit tests.
**/
**/
namespace fake
{
class channel final : public nano::transport::channel
@ -50,13 +50,20 @@ namespace transport
return nano::transport::transport_type::fake;
}
void disconnect ()
void close ()
{
endpoint = nano::endpoint (boost::asio::ip::address_v6::any (), 0);
closed = true;
}
bool alive () const override
{
return !closed;
}
private:
nano::endpoint endpoint;
std::atomic<bool> closed{ false };
};
} // namespace fake
} // namespace transport

View file

@ -4,6 +4,10 @@
#include <boost/format.hpp>
/*
* channel_tcp
*/
nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::weak_ptr<nano::socket> socket_a) :
channel (node_a),
socket (std::move (socket_a))
@ -105,6 +109,10 @@ void nano::transport::channel_tcp::set_endpoint ()
}
}
/*
* tcp_channels
*/
nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink) :
node{ node },
sink{ std::move (sink) }
@ -443,8 +451,19 @@ std::unique_ptr<nano::container_info_component> nano::transport::tcp_channels::c
void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point const & cutoff_a)
{
nano::lock_guard<nano::mutex> lock (mutex);
// Remove channels with dead underlying sockets
for (auto it = channels.begin (); it != channels.end (); ++it)
{
if (!it->socket->alive ())
{
it = channels.erase (it);
}
}
auto disconnect_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (cutoff_a));
channels.get<last_packet_sent_tag> ().erase (channels.get<last_packet_sent_tag> ().begin (), disconnect_cutoff);
// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_a));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);
@ -461,7 +480,7 @@ void nano::transport::tcp_channels::ongoing_keepalive ()
nano::unique_lock<nano::mutex> lock (mutex);
// Wake up channels
std::vector<std::shared_ptr<nano::transport::channel_tcp>> send_list;
auto keepalive_sent_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (std::chrono::steady_clock::now () - node.network_params.network.cleanup_period));
auto keepalive_sent_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (std::chrono::steady_clock::now () - node.network_params.network.keepalive_period));
for (auto i (channels.get<last_packet_sent_tag> ().begin ()); i != keepalive_sent_cutoff; ++i)
{
send_list.push_back (i->channel);
@ -486,7 +505,7 @@ void nano::transport::tcp_channels::ongoing_keepalive ()
}
}
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.cleanup_period_half (), [node_w] () {
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () {
if (auto node_l = node_w.lock ())
{
if (!node_l->network.tcp_channels.stopped)

View file

@ -79,6 +79,15 @@ namespace transport
return result;
}
virtual bool alive () const override
{
if (auto socket_l = socket.lock ())
{
return socket_l->alive ();
}
return false;
}
private:
nano::tcp_endpoint endpoint{ boost::asio::ip::address_v6::any (), 0 };
};

View file

@ -53,6 +53,10 @@ namespace transport
{
return false;
}
virtual bool alive () const
{
return true;
}
std::chrono::steady_clock::time_point get_last_bootstrap_attempt () const
{

View file

@ -97,6 +97,16 @@
ASSERT_TRUE (condition); \
}
/*
* Asserts that condition is always true during the specified amount of time
*/
#define ASSERT_ALWAYS_EQ(time, val1, val2) \
system.deadline_set (time); \
while (!system.poll ()) \
{ \
ASSERT_EQ (val1, val2); \
}
/*
* Asserts that condition is never true during the specified amount of time
*/