From 0ce830f5d6f4496aeb3a5823b5fc9e12eaf04dd9 Mon Sep 17 00:00:00 2001 From: Dimitrios Siganos Date: Mon, 4 Mar 2024 11:31:56 +0000 Subject: [PATCH] Process connection attempts in a round-robin fashion in order to load-balance connection attempts. # Conflicts: # nano/node/network.cpp --- nano/lib/config.hpp | 4 ++ nano/node/network.cpp | 2 - nano/node/transport/channel.hpp | 2 +- nano/node/transport/tcp.cpp | 72 ++++++++++++++++++++++++++++++ nano/node/transport/tcp.hpp | 2 + nano/node/transport/tcp_server.cpp | 10 +++++ nano/node/transport/tcp_server.hpp | 2 + 7 files changed, 91 insertions(+), 3 deletions(-) diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index a68b02c1..bddfa29c 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -202,6 +202,7 @@ public: default_websocket_port (47000), aec_loop_interval_ms (300), // Update AEC ~3 times per second cleanup_period (default_cleanup_period), + merge_period (std::chrono::milliseconds (250)), keepalive_period (std::chrono::seconds (15)), idle_timeout (default_cleanup_period * 2), silent_connection_tolerance_time (std::chrono::seconds (120)), @@ -239,6 +240,7 @@ public: { aec_loop_interval_ms = 20; cleanup_period = std::chrono::seconds (1); + merge_period = std::chrono::milliseconds (10); keepalive_period = std::chrono::seconds (1); idle_timeout = cleanup_period * 15; max_peers_per_ip = 20; @@ -277,6 +279,8 @@ public: { return cleanup_period * 5; } + /** How often to connect to other peers */ + std::chrono::milliseconds merge_period; /** How often to send keepalive messages */ std::chrono::seconds keepalive_period; /** Default maximum idle time for a socket before it's automatically closed */ diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 4d47533b..0c9cf226 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -239,8 +239,6 @@ public: void keepalive (nano::keepalive const & message_a) override { - node.network.merge_peers (message_a.peers); - // Check for special node port data auto peer0 (message_a.peers[0]); if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index a16ee0e2..bede756c 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -187,4 +187,4 @@ struct hash> return hash (channel_a.get ()); } }; -} \ No newline at end of file +} diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index a085dbfc..e6ddca61 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -357,6 +357,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa void nano::transport::tcp_channels::start () { ongoing_keepalive (); + ongoing_merge (0); } void nano::transport::tcp_channels::stop () @@ -509,6 +510,77 @@ void nano::transport::tcp_channels::ongoing_keepalive () }); } +void nano::transport::tcp_channels::ongoing_merge (size_t channel_index) +{ + nano::unique_lock lock{ mutex }; + std::optional keepalive; + size_t count = 0; + while (!keepalive && channels.size () > 0 && count++ < channels.size ()) + { + ++channel_index; + if (channels.size () <= channel_index) + { + channel_index = 0; + } + auto server = channels.get ()[channel_index].response_server; + if (server && server->last_keepalive) + { + keepalive = std::move (server->last_keepalive); + server->last_keepalive = std::nullopt; + } + } + lock.unlock (); + if (keepalive) + { + ongoing_merge (channel_index, *keepalive, 1); + } + else + { + std::weak_ptr node_w = node.shared (); + node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () { + if (auto node_l = node_w.lock ()) + { + if (!node_l->network.tcp_channels.stopped) + { + node_l->network.tcp_channels.ongoing_merge (channel_index); + } + } + }); + } +} + +void nano::transport::tcp_channels::ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index) +{ + debug_assert (peer_index < keepalive.peers.size ()); + node.network.merge_peer (keepalive.peers[peer_index++]); + if (peer_index < keepalive.peers.size ()) + { + std::weak_ptr node_w = node.shared (); + node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index, keepalive, peer_index] () { + if (auto node_l = node_w.lock ()) + { + if (!node_l->network.tcp_channels.stopped) + { + node_l->network.tcp_channels.ongoing_merge (channel_index, keepalive, peer_index); + } + } + }); + } + else + { + std::weak_ptr node_w = node.shared (); + node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.merge_period, [node_w, channel_index] () { + if (auto node_l = node_w.lock ()) + { + if (!node_l->network.tcp_channels.stopped) + { + node_l->network.tcp_channels.ongoing_merge (channel_index); + } + } + }); + } +} + void nano::transport::tcp_channels::list (std::deque> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a) { nano::lock_guard lock{ mutex }; diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 7b6efd94..0f65b3bd 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -128,6 +128,8 @@ namespace transport std::unique_ptr collect_container_info (std::string const &); void purge (std::chrono::steady_clock::time_point const &); void ongoing_keepalive (); + void ongoing_merge (size_t channel_index); + void ongoing_merge (size_t channel_index, nano::keepalive keepalive, size_t peer_index); void list (std::deque> &, uint8_t = 0, bool = true); void modify (std::shared_ptr const &, std::function const &)>); void update (nano::tcp_endpoint const &); diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 7cfee602..7e37ef97 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -623,6 +623,7 @@ nano::transport::tcp_server::realtime_message_visitor::realtime_message_visitor void nano::transport::tcp_server::realtime_message_visitor::keepalive (const nano::keepalive & message) { process = true; + server.set_last_keepalive (message); } void nano::transport::tcp_server::realtime_message_visitor::publish (const nano::publish & message) @@ -786,6 +787,15 @@ void nano::transport::tcp_server::timeout () } } +void nano::transport::tcp_server::set_last_keepalive (nano::keepalive const & message) +{ + std::lock_guard lock{ mutex }; + if (!last_keepalive) + { + last_keepalive = message; + } +} + bool nano::transport::tcp_server::to_bootstrap_connection () { auto node = this->node.lock (); diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index e011c506..53692585 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -62,6 +62,7 @@ public: void stop (); void timeout (); + void set_last_keepalive (nano::keepalive const & message); std::shared_ptr const socket; std::weak_ptr const node; @@ -72,6 +73,7 @@ public: nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; nano::account remote_node_id{}; std::chrono::steady_clock::time_point last_telemetry_req{}; + std::optional last_keepalive; private: void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2);