From 2bcd2936f1c75d91a7778b750504bce4e3128f11 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Mon, 1 Jan 2018 20:07:23 -0600 Subject: [PATCH] Track endpoint keepalive attempts so we don't repeatedly send keepalives to IPs if they don't respond or are offline. --- rai/core_test/peer_container.cpp | 144 ++++++++++++++++++------------- rai/node/node.cpp | 87 +++++++++++-------- rai/node/node.hpp | 17 ++++ 3 files changed, 152 insertions(+), 96 deletions(-) diff --git a/rai/core_test/peer_container.cpp b/rai/core_test/peer_container.cpp index da0c8bb8..a202172f 100644 --- a/rai/core_test/peer_container.cpp +++ b/rai/core_test/peer_container.cpp @@ -3,23 +3,23 @@ TEST (peer_container, empty_peers) { - rai::peer_container peers (rai::endpoint {}); - auto list (peers.purge_list (std::chrono::system_clock::now ())); - ASSERT_EQ (0, list.size ()); + rai::peer_container peers (rai::endpoint {}); + auto list (peers.purge_list (std::chrono::system_clock::now ())); + ASSERT_EQ (0, list.size ()); } TEST (peer_container, no_recontact) { - rai::peer_container peers (rai::endpoint {}); + rai::peer_container peers (rai::endpoint {}); auto observed_peer (0); auto observed_disconnect (false); - rai::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 10000); - ASSERT_EQ (0, peers.size ()); + rai::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 10000); + ASSERT_EQ (0, peers.size ()); peers.peer_observer = [&observed_peer] (rai::endpoint const &) {++observed_peer;}; peers.disconnect_observer = [&observed_disconnect] () {observed_disconnect = true;}; - ASSERT_FALSE (peers.insert (endpoint1, 0)); - ASSERT_EQ (1, peers.size ()); - ASSERT_TRUE (peers.insert (endpoint1, 0)); + ASSERT_FALSE (peers.insert (endpoint1, 0)); + ASSERT_EQ (1, peers.size ()); + ASSERT_TRUE (peers.insert (endpoint1, 0)); auto remaining (peers.purge_list (std::chrono::system_clock::now () + std::chrono::seconds (5))); ASSERT_TRUE (remaining.empty ()); ASSERT_EQ (1, observed_peer); @@ -28,89 +28,89 @@ TEST (peer_container, no_recontact) TEST (peer_container, no_self_incoming) { - rai::endpoint self (boost::asio::ip::address_v6::loopback (), 10000); - rai::peer_container peers (self); - peers.insert (self, 0); - ASSERT_TRUE (peers.peers.empty ()); + rai::endpoint self (boost::asio::ip::address_v6::loopback (), 10000); + rai::peer_container peers (self); + peers.insert (self, 0); + ASSERT_TRUE (peers.peers.empty ()); } TEST (peer_container, no_self_contacting) { - rai::endpoint self (boost::asio::ip::address_v6::loopback (), 10000); - rai::peer_container peers (self); - peers.insert (self, 0); - ASSERT_TRUE (peers.peers.empty ()); + rai::endpoint self (boost::asio::ip::address_v6::loopback (), 10000); + rai::peer_container peers (self); + peers.insert (self, 0); + ASSERT_TRUE (peers.peers.empty ()); } TEST (peer_container, reserved_peers_no_contact) { - rai::peer_container peers (rai::endpoint {}); - ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x00000001)), 10000), 0)); - ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xc0000201)), 10000), 0)); - ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xc6336401)), 10000), 0)); - ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xcb007101)), 10000), 0)); - ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xe9fc0001)), 10000), 0)); - ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xf0000001)), 10000), 0)); - ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xffffffff)), 10000), 0)); - ASSERT_EQ (0, peers.size ()); + rai::peer_container peers (rai::endpoint {}); + ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x00000001)), 10000), 0)); + ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xc0000201)), 10000), 0)); + ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xc6336401)), 10000), 0)); + ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xcb007101)), 10000), 0)); + ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xe9fc0001)), 10000), 0)); + ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xf0000001)), 10000), 0)); + ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xffffffff)), 10000), 0)); + ASSERT_EQ (0, peers.size ()); } TEST (peer_container, split) { - rai::peer_container peers (rai::endpoint {}); - auto now (std::chrono::system_clock::now ()); - rai::endpoint endpoint1 (boost::asio::ip::address_v6::any (), 100); - rai::endpoint endpoint2 (boost::asio::ip::address_v6::any (), 101); + rai::peer_container peers (rai::endpoint {}); + auto now (std::chrono::system_clock::now ()); + rai::endpoint endpoint1 (boost::asio::ip::address_v6::any (), 100); + rai::endpoint endpoint2 (boost::asio::ip::address_v6::any (), 101); peers.peers.insert (rai::peer_information (endpoint1, now - std::chrono::seconds (1), now)); - peers.peers.insert (rai::peer_information (endpoint2, now + std::chrono::seconds (1), now)); + peers.peers.insert (rai::peer_information (endpoint2, now + std::chrono::seconds (1), now)); ASSERT_EQ (2, peers.peers.size ()); - auto list (peers.purge_list (now)); + auto list (peers.purge_list (now)); ASSERT_EQ (1, peers.peers.size ()); - ASSERT_EQ (1, list.size ()); - ASSERT_EQ (endpoint2, list [0].endpoint); + ASSERT_EQ (1, list.size ()); + ASSERT_EQ (endpoint2, list [0].endpoint); } TEST (peer_container, fill_random_clear) { - rai::peer_container peers (rai::endpoint {}); - std::array target; - std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000)); - peers.random_fill (target); - ASSERT_TRUE (std::all_of (target.begin (), target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::any (), 0); })); + rai::peer_container peers (rai::endpoint {}); + std::array target; + std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000)); + peers.random_fill (target); + ASSERT_TRUE (std::all_of (target.begin (), target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::any (), 0); })); } TEST (peer_container, fill_random_full) { - rai::peer_container peers (rai::endpoint {}); - for (auto i (0); i < 100; ++i) - { - peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), i), 0); - } - std::array target; - std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000)); - peers.random_fill (target); - ASSERT_TRUE (std::none_of (target.begin (), target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000); })); + rai::peer_container peers (rai::endpoint {}); + for (auto i (0); i < 100; ++i) + { + peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), i), 0); + } + std::array target; + std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000)); + peers.random_fill (target); + ASSERT_TRUE (std::none_of (target.begin (), target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000); })); } TEST (peer_container, fill_random_part) { - rai::peer_container peers (rai::endpoint {}); - std::array target; - auto half (target.size () / 2); - for (auto i (0); i < half; ++i) - { - peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), i + 1), 0); - } - std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000)); - peers.random_fill (target); - ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000); })); - ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 0); })); - ASSERT_TRUE (std::all_of (target.begin () + half, target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::any (), 0); })); + rai::peer_container peers (rai::endpoint {}); + std::array target; + auto half (target.size () / 2); + for (auto i (0); i < half; ++i) + { + peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), i + 1), 0); + } + std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000)); + peers.random_fill (target); + ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000); })); + ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 0); })); + ASSERT_TRUE (std::all_of (target.begin () + half, target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::any (), 0); })); } TEST (peer_container, list_sqrt) { - rai::peer_container peers (rai::endpoint {}); + rai::peer_container peers (rai::endpoint {}); auto list1 (peers.list_sqrt ()); ASSERT_TRUE (list1.empty ()); for (auto i (0); i < 1000; ++i) @@ -123,7 +123,7 @@ TEST (peer_container, list_sqrt) TEST (peer_container, rep_weight) { - rai::peer_container peers (rai::endpoint {}); + rai::peer_container peers (rai::endpoint {}); peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), 24001), 0); ASSERT_TRUE (peers.representatives (1).empty ()); rai::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24000); @@ -139,3 +139,23 @@ TEST (peer_container, rep_weight) ASSERT_EQ (100, reps [0].rep_weight.number ()); ASSERT_EQ (endpoint0, reps [0].endpoint); } + +// Test to make sure we don't repeatedly send keepalive messages to nodes that aren't responding +TEST (peer_container, reachout) +{ + rai::peer_container peers (rai::endpoint {}); + rai::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24000); + // Make sure having been contacted by them already indicates we shouldn't reach out + peers.contacted (endpoint0, 0); + ASSERT_TRUE (peers.reachout (endpoint0)); + rai::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 24001); + ASSERT_FALSE (peers.reachout (endpoint1)); + // Reaching out to them once should signal we shouldn't reach out again. + ASSERT_TRUE (peers.reachout (endpoint1)); + // Make sure we don't purge new items + peers.purge_list (std::chrono::system_clock::now () - std::chrono::seconds (10)); + ASSERT_TRUE (peers.reachout (endpoint1)); + // Make sure we purge old items + peers.purge_list (std::chrono::system_clock::now () + std::chrono::seconds (10)); + ASSERT_FALSE (peers.reachout (endpoint1)); +} diff --git a/rai/node/node.cpp b/rai/node/node.cpp index e7adf45b..dc2a5138 100755 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -73,21 +73,21 @@ void rai::network::stop () void rai::network::send_keepalive (rai::endpoint const & endpoint_a) { - assert (endpoint_a.address ().is_v6 ()); - rai::keepalive message; - node.peers.random_fill (message.peers); - std::shared_ptr > bytes (new std::vector ); - { - rai::vectorstream stream (*bytes); - message.serialize (stream); - } - if (node.config.logging.network_keepalive_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Keepalive req sent to %1%") % endpoint_a); - } - ++outgoing.keepalive; + assert (endpoint_a.address ().is_v6 ()); + rai::keepalive message; + node.peers.random_fill (message.peers); + std::shared_ptr > bytes (new std::vector ); + { + rai::vectorstream stream (*bytes); + message.serialize (stream); + } + if (node.config.logging.network_keepalive_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Keepalive req sent to %1%") % endpoint_a); + } + ++outgoing.keepalive; std::weak_ptr node_w (node.shared ()); - send_buffer (bytes->data (), bytes->size (), endpoint_a, [bytes, node_w, endpoint_a] (boost::system::error_code const & ec, size_t) + send_buffer (bytes->data (), bytes->size (), endpoint_a, [bytes, node_w, endpoint_a] (boost::system::error_code const & ec, size_t) { if (auto node_l = node_w.lock ()) { @@ -111,11 +111,11 @@ void rai::node::keepalive (std::string const & address_a, uint16_t port_a) { for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator {}); i != n; ++i) { - auto endpoint (i->endpoint ()); - if (endpoint.address ().is_v4 ()) - { + auto endpoint (i->endpoint ()); + if (endpoint.address ().is_v4 ()) + { endpoint = rai::endpoint (boost::asio::ip::address_v6::v4_mapped (endpoint.address ().to_v4 ()), endpoint.port ()); - } + } node_l->send_keepalive (endpoint); } } @@ -133,7 +133,7 @@ void rai::network::republish (rai::block_hash const & hash_a, std::shared_ptr node_w (node.shared ()); + std::weak_ptr node_w (node.shared ()); send_buffer (buffer_a->data (), buffer_a->size (), endpoint_a, [buffer_a, node_w, endpoint_a] (boost::system::error_code const & ec, size_t size) { if (auto node_l = node_w.lock ()) @@ -473,13 +473,13 @@ void rai::network::receive_action (boost::system::error_code const & error, size // Send keepalives to all the peers we've been notified of void rai::network::merge_peers (std::array const & peers_a) { - for (auto i (peers_a.begin ()), j (peers_a.end ()); i != j; ++i) - { - if (!node.peers.not_a_peer (*i) && !node.peers.known_peer (*i)) - { - send_keepalive (*i); - } - } + for (auto i (peers_a.begin ()), j (peers_a.end ()); i != j; ++i) + { + if (!node.peers.reachout (*i)) + { + send_keepalive (*i); + } + } } bool rai::operation::operator > (rai::operation const & other_a) const @@ -1881,14 +1881,14 @@ rai::account rai::node::representative (rai::account const & account_a) void rai::node::ongoing_keepalive () { - keepalive_preconfigured (config.preconfigured_peers); - auto peers_l (peers.purge_list (std::chrono::system_clock::now () - cutoff)); - for (auto i (peers_l.begin ()), j (peers_l.end ()); i != j && std::chrono::system_clock::now () - i->last_attempt > period; ++i) - { - network.send_keepalive (i->endpoint); - } + keepalive_preconfigured (config.preconfigured_peers); + auto peers_l (peers.purge_list (std::chrono::system_clock::now () - cutoff)); + for (auto i (peers_l.begin ()), j (peers_l.end ()); i != j && std::chrono::system_clock::now () - i->last_attempt > period; ++i) + { + network.send_keepalive (i->endpoint); + } std::weak_ptr node_w (shared_from_this ()); - alarm.add (std::chrono::system_clock::now () + period, [node_w] () + alarm.add (std::chrono::system_clock::now () + period, [node_w] () { if (auto node_l = node_w.lock ()) { @@ -2388,11 +2388,16 @@ std::vector rai::peer_container::purge_list (std::chrono std::lock_guard lock (mutex); auto pivot (peers.get <1> ().lower_bound (cutoff)); result.assign (pivot, peers.get <1> ().end ()); + // Remove peers that haven't been heard from past the cutoff peers.get <1> ().erase (peers.get <1> ().begin (), pivot); for (auto i (peers.begin ()), n (peers.end ()); i != n; ++i) { peers.modify (i, [] (rai::peer_information & info) {info.last_attempt = std::chrono::system_clock::now ();}); } + + // Remove keepalive attempt tracking for attempts older than cutoff + auto attempts_pivot (attempts.get <1> ().lower_bound (cutoff)); + attempts.get <1> ().erase (attempts.get <1> ().begin (), attempts_pivot); } if (result.empty ()) { @@ -2482,6 +2487,20 @@ void rai::peer_container::rep_request (rai::endpoint const & endpoint_a) } } +bool rai::peer_container::reachout (rai::endpoint const & endpoint_a) +{ + auto result (false); + // Don't contact invalid IPs + result |= not_a_peer (endpoint_a); + // Don't keepalive to nodes that already sent us something + result |= known_peer (endpoint_a); + std::lock_guard lock (mutex); + auto existing (attempts.find (endpoint_a)); + result |= existing != attempts.end (); + attempts.insert ({endpoint_a, std::chrono::system_clock::now ()}); + return result; +} + bool rai::peer_container::insert (rai::endpoint const & endpoint_a, unsigned version_a) { auto unknown (false); @@ -2669,7 +2688,7 @@ bool rai::peer_container::known_peer (rai::endpoint const & endpoint_a) { std::lock_guard lock (mutex); auto existing (peers.find (endpoint_a)); - return existing != peers.end () && existing->last_contact > std::chrono::system_clock::now () - rai::node::cutoff; + return existing != peers.end (); } std::shared_ptr rai::node::shared () diff --git a/rai/node/node.hpp b/rai/node/node.hpp index a3d346ed..35642a23 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -163,6 +163,12 @@ public: rai::amount rep_weight; unsigned network_version; }; +class peer_attempt +{ +public: + rai::endpoint endpoint; + std::chrono::system_clock::time_point last_attempt; +}; class peer_container { public: @@ -191,6 +197,8 @@ public: std::vector rep_crawl (); bool rep_response (rai::endpoint const &, rai::amount const &); void rep_request (rai::endpoint const &); + // Should we reach out to this endpoint with a keepalive message + bool reachout (rai::endpoint const &); size_t size (); size_t size_sqrt (); bool empty (); @@ -210,6 +218,15 @@ public: boost::multi_index::ordered_non_unique , std::greater > > > peers; + boost::multi_index_container + < + peer_attempt, + boost::multi_index::indexed_by + < + boost::multi_index::hashed_unique >, + boost::multi_index::ordered_non_unique > + > + > attempts; // Called when a new peer is observed std::function peer_observer; std::function disconnect_observer;