Track endpoint keepalive attempts so we don't repeatedly send keepalives to IPs if they don't respond or are offline.

This commit is contained in:
clemahieu 2018-01-01 20:07:23 -06:00
commit 2bcd2936f1
3 changed files with 152 additions and 96 deletions

View file

@ -3,23 +3,23 @@
TEST (peer_container, empty_peers)
{
rai::peer_container peers (rai::endpoint {});
auto list (peers.purge_list (std::chrono::system_clock::now ()));
ASSERT_EQ (0, list.size ());
rai::peer_container peers (rai::endpoint {});
auto list (peers.purge_list (std::chrono::system_clock::now ()));
ASSERT_EQ (0, list.size ());
}
TEST (peer_container, no_recontact)
{
rai::peer_container peers (rai::endpoint {});
rai::peer_container peers (rai::endpoint {});
auto observed_peer (0);
auto observed_disconnect (false);
rai::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 10000);
ASSERT_EQ (0, peers.size ());
rai::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 10000);
ASSERT_EQ (0, peers.size ());
peers.peer_observer = [&observed_peer] (rai::endpoint const &) {++observed_peer;};
peers.disconnect_observer = [&observed_disconnect] () {observed_disconnect = true;};
ASSERT_FALSE (peers.insert (endpoint1, 0));
ASSERT_EQ (1, peers.size ());
ASSERT_TRUE (peers.insert (endpoint1, 0));
ASSERT_FALSE (peers.insert (endpoint1, 0));
ASSERT_EQ (1, peers.size ());
ASSERT_TRUE (peers.insert (endpoint1, 0));
auto remaining (peers.purge_list (std::chrono::system_clock::now () + std::chrono::seconds (5)));
ASSERT_TRUE (remaining.empty ());
ASSERT_EQ (1, observed_peer);
@ -28,89 +28,89 @@ TEST (peer_container, no_recontact)
TEST (peer_container, no_self_incoming)
{
rai::endpoint self (boost::asio::ip::address_v6::loopback (), 10000);
rai::peer_container peers (self);
peers.insert (self, 0);
ASSERT_TRUE (peers.peers.empty ());
rai::endpoint self (boost::asio::ip::address_v6::loopback (), 10000);
rai::peer_container peers (self);
peers.insert (self, 0);
ASSERT_TRUE (peers.peers.empty ());
}
TEST (peer_container, no_self_contacting)
{
rai::endpoint self (boost::asio::ip::address_v6::loopback (), 10000);
rai::peer_container peers (self);
peers.insert (self, 0);
ASSERT_TRUE (peers.peers.empty ());
rai::endpoint self (boost::asio::ip::address_v6::loopback (), 10000);
rai::peer_container peers (self);
peers.insert (self, 0);
ASSERT_TRUE (peers.peers.empty ());
}
TEST (peer_container, reserved_peers_no_contact)
{
rai::peer_container peers (rai::endpoint {});
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x00000001)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xc0000201)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xc6336401)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xcb007101)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xe9fc0001)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xf0000001)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xffffffff)), 10000), 0));
ASSERT_EQ (0, peers.size ());
rai::peer_container peers (rai::endpoint {});
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x00000001)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xc0000201)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xc6336401)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xcb007101)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xe9fc0001)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xf0000001)), 10000), 0));
ASSERT_TRUE (peers.insert (rai::endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0xffffffff)), 10000), 0));
ASSERT_EQ (0, peers.size ());
}
TEST (peer_container, split)
{
rai::peer_container peers (rai::endpoint {});
auto now (std::chrono::system_clock::now ());
rai::endpoint endpoint1 (boost::asio::ip::address_v6::any (), 100);
rai::endpoint endpoint2 (boost::asio::ip::address_v6::any (), 101);
rai::peer_container peers (rai::endpoint {});
auto now (std::chrono::system_clock::now ());
rai::endpoint endpoint1 (boost::asio::ip::address_v6::any (), 100);
rai::endpoint endpoint2 (boost::asio::ip::address_v6::any (), 101);
peers.peers.insert (rai::peer_information (endpoint1, now - std::chrono::seconds (1), now));
peers.peers.insert (rai::peer_information (endpoint2, now + std::chrono::seconds (1), now));
peers.peers.insert (rai::peer_information (endpoint2, now + std::chrono::seconds (1), now));
ASSERT_EQ (2, peers.peers.size ());
auto list (peers.purge_list (now));
auto list (peers.purge_list (now));
ASSERT_EQ (1, peers.peers.size ());
ASSERT_EQ (1, list.size ());
ASSERT_EQ (endpoint2, list [0].endpoint);
ASSERT_EQ (1, list.size ());
ASSERT_EQ (endpoint2, list [0].endpoint);
}
TEST (peer_container, fill_random_clear)
{
rai::peer_container peers (rai::endpoint {});
std::array <rai::endpoint, 8> target;
std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000));
peers.random_fill (target);
ASSERT_TRUE (std::all_of (target.begin (), target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::any (), 0); }));
rai::peer_container peers (rai::endpoint {});
std::array <rai::endpoint, 8> target;
std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000));
peers.random_fill (target);
ASSERT_TRUE (std::all_of (target.begin (), target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::any (), 0); }));
}
TEST (peer_container, fill_random_full)
{
rai::peer_container peers (rai::endpoint {});
for (auto i (0); i < 100; ++i)
{
peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), i), 0);
}
std::array <rai::endpoint, 8> target;
std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000));
peers.random_fill (target);
ASSERT_TRUE (std::none_of (target.begin (), target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000); }));
rai::peer_container peers (rai::endpoint {});
for (auto i (0); i < 100; ++i)
{
peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), i), 0);
}
std::array <rai::endpoint, 8> target;
std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000));
peers.random_fill (target);
ASSERT_TRUE (std::none_of (target.begin (), target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000); }));
}
TEST (peer_container, fill_random_part)
{
rai::peer_container peers (rai::endpoint {});
std::array <rai::endpoint, 8> target;
auto half (target.size () / 2);
for (auto i (0); i < half; ++i)
{
peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), i + 1), 0);
}
std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000));
peers.random_fill (target);
ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000); }));
ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 0); }));
ASSERT_TRUE (std::all_of (target.begin () + half, target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::any (), 0); }));
rai::peer_container peers (rai::endpoint {});
std::array <rai::endpoint, 8> target;
auto half (target.size () / 2);
for (auto i (0); i < half; ++i)
{
peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), i + 1), 0);
}
std::fill (target.begin (), target.end (), rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000));
peers.random_fill (target);
ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 10000); }));
ASSERT_TRUE (std::none_of (target.begin (), target.begin () + half, [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::loopback (), 0); }));
ASSERT_TRUE (std::all_of (target.begin () + half, target.end (), [] (rai::endpoint const & endpoint_a) {return endpoint_a == rai::endpoint (boost::asio::ip::address_v6::any (), 0); }));
}
TEST (peer_container, list_sqrt)
{
rai::peer_container peers (rai::endpoint {});
rai::peer_container peers (rai::endpoint {});
auto list1 (peers.list_sqrt ());
ASSERT_TRUE (list1.empty ());
for (auto i (0); i < 1000; ++i)
@ -123,7 +123,7 @@ TEST (peer_container, list_sqrt)
TEST (peer_container, rep_weight)
{
rai::peer_container peers (rai::endpoint {});
rai::peer_container peers (rai::endpoint {});
peers.insert (rai::endpoint (boost::asio::ip::address_v6::loopback (), 24001), 0);
ASSERT_TRUE (peers.representatives (1).empty ());
rai::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24000);
@ -139,3 +139,23 @@ TEST (peer_container, rep_weight)
ASSERT_EQ (100, reps [0].rep_weight.number ());
ASSERT_EQ (endpoint0, reps [0].endpoint);
}
// Test to make sure we don't repeatedly send keepalive messages to nodes that aren't responding
TEST (peer_container, reachout)
{
rai::peer_container peers (rai::endpoint {});
rai::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24000);
// Make sure having been contacted by them already indicates we shouldn't reach out
peers.contacted (endpoint0, 0);
ASSERT_TRUE (peers.reachout (endpoint0));
rai::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 24001);
ASSERT_FALSE (peers.reachout (endpoint1));
// Reaching out to them once should signal we shouldn't reach out again.
ASSERT_TRUE (peers.reachout (endpoint1));
// Make sure we don't purge new items
peers.purge_list (std::chrono::system_clock::now () - std::chrono::seconds (10));
ASSERT_TRUE (peers.reachout (endpoint1));
// Make sure we purge old items
peers.purge_list (std::chrono::system_clock::now () + std::chrono::seconds (10));
ASSERT_FALSE (peers.reachout (endpoint1));
}

View file

@ -73,21 +73,21 @@ void rai::network::stop ()
void rai::network::send_keepalive (rai::endpoint const & endpoint_a)
{
assert (endpoint_a.address ().is_v6 ());
rai::keepalive message;
node.peers.random_fill (message.peers);
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
{
rai::vectorstream stream (*bytes);
message.serialize (stream);
}
if (node.config.logging.network_keepalive_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Keepalive req sent to %1%") % endpoint_a);
}
++outgoing.keepalive;
assert (endpoint_a.address ().is_v6 ());
rai::keepalive message;
node.peers.random_fill (message.peers);
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
{
rai::vectorstream stream (*bytes);
message.serialize (stream);
}
if (node.config.logging.network_keepalive_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Keepalive req sent to %1%") % endpoint_a);
}
++outgoing.keepalive;
std::weak_ptr <rai::node> node_w (node.shared ());
send_buffer (bytes->data (), bytes->size (), endpoint_a, [bytes, node_w, endpoint_a] (boost::system::error_code const & ec, size_t)
send_buffer (bytes->data (), bytes->size (), endpoint_a, [bytes, node_w, endpoint_a] (boost::system::error_code const & ec, size_t)
{
if (auto node_l = node_w.lock ())
{
@ -111,11 +111,11 @@ void rai::node::keepalive (std::string const & address_a, uint16_t port_a)
{
for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator {}); i != n; ++i)
{
auto endpoint (i->endpoint ());
if (endpoint.address ().is_v4 ())
{
auto endpoint (i->endpoint ());
if (endpoint.address ().is_v4 ())
{
endpoint = rai::endpoint (boost::asio::ip::address_v6::v4_mapped (endpoint.address ().to_v4 ()), endpoint.port ());
}
}
node_l->send_keepalive (endpoint);
}
}
@ -133,7 +133,7 @@ void rai::network::republish (rai::block_hash const & hash_a, std::shared_ptr <s
{
BOOST_LOG (node.log) << boost::str (boost::format ("Publishing %1% to %2%") % hash_a.to_string () % endpoint_a);
}
std::weak_ptr <rai::node> node_w (node.shared ());
std::weak_ptr <rai::node> node_w (node.shared ());
send_buffer (buffer_a->data (), buffer_a->size (), endpoint_a, [buffer_a, node_w, endpoint_a] (boost::system::error_code const & ec, size_t size)
{
if (auto node_l = node_w.lock ())
@ -473,13 +473,13 @@ void rai::network::receive_action (boost::system::error_code const & error, size
// Send keepalives to all the peers we've been notified of
void rai::network::merge_peers (std::array <rai::endpoint, 8> const & peers_a)
{
for (auto i (peers_a.begin ()), j (peers_a.end ()); i != j; ++i)
{
if (!node.peers.not_a_peer (*i) && !node.peers.known_peer (*i))
{
send_keepalive (*i);
}
}
for (auto i (peers_a.begin ()), j (peers_a.end ()); i != j; ++i)
{
if (!node.peers.reachout (*i))
{
send_keepalive (*i);
}
}
}
bool rai::operation::operator > (rai::operation const & other_a) const
@ -1881,14 +1881,14 @@ rai::account rai::node::representative (rai::account const & account_a)
void rai::node::ongoing_keepalive ()
{
keepalive_preconfigured (config.preconfigured_peers);
auto peers_l (peers.purge_list (std::chrono::system_clock::now () - cutoff));
for (auto i (peers_l.begin ()), j (peers_l.end ()); i != j && std::chrono::system_clock::now () - i->last_attempt > period; ++i)
{
network.send_keepalive (i->endpoint);
}
keepalive_preconfigured (config.preconfigured_peers);
auto peers_l (peers.purge_list (std::chrono::system_clock::now () - cutoff));
for (auto i (peers_l.begin ()), j (peers_l.end ()); i != j && std::chrono::system_clock::now () - i->last_attempt > period; ++i)
{
network.send_keepalive (i->endpoint);
}
std::weak_ptr <rai::node> node_w (shared_from_this ());
alarm.add (std::chrono::system_clock::now () + period, [node_w] ()
alarm.add (std::chrono::system_clock::now () + period, [node_w] ()
{
if (auto node_l = node_w.lock ())
{
@ -2388,11 +2388,16 @@ std::vector <rai::peer_information> rai::peer_container::purge_list (std::chrono
std::lock_guard <std::mutex> lock (mutex);
auto pivot (peers.get <1> ().lower_bound (cutoff));
result.assign (pivot, peers.get <1> ().end ());
// Remove peers that haven't been heard from past the cutoff
peers.get <1> ().erase (peers.get <1> ().begin (), pivot);
for (auto i (peers.begin ()), n (peers.end ()); i != n; ++i)
{
peers.modify (i, [] (rai::peer_information & info) {info.last_attempt = std::chrono::system_clock::now ();});
}
// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_pivot (attempts.get <1> ().lower_bound (cutoff));
attempts.get <1> ().erase (attempts.get <1> ().begin (), attempts_pivot);
}
if (result.empty ())
{
@ -2482,6 +2487,20 @@ void rai::peer_container::rep_request (rai::endpoint const & endpoint_a)
}
}
bool rai::peer_container::reachout (rai::endpoint const & endpoint_a)
{
auto result (false);
// Don't contact invalid IPs
result |= not_a_peer (endpoint_a);
// Don't keepalive to nodes that already sent us something
result |= known_peer (endpoint_a);
std::lock_guard <std::mutex> lock (mutex);
auto existing (attempts.find (endpoint_a));
result |= existing != attempts.end ();
attempts.insert ({endpoint_a, std::chrono::system_clock::now ()});
return result;
}
bool rai::peer_container::insert (rai::endpoint const & endpoint_a, unsigned version_a)
{
auto unknown (false);
@ -2669,7 +2688,7 @@ bool rai::peer_container::known_peer (rai::endpoint const & endpoint_a)
{
std::lock_guard <std::mutex> lock (mutex);
auto existing (peers.find (endpoint_a));
return existing != peers.end () && existing->last_contact > std::chrono::system_clock::now () - rai::node::cutoff;
return existing != peers.end ();
}
std::shared_ptr <rai::node> rai::node::shared ()

View file

@ -163,6 +163,12 @@ public:
rai::amount rep_weight;
unsigned network_version;
};
class peer_attempt
{
public:
rai::endpoint endpoint;
std::chrono::system_clock::time_point last_attempt;
};
class peer_container
{
public:
@ -191,6 +197,8 @@ public:
std::vector <rai::endpoint> rep_crawl ();
bool rep_response (rai::endpoint const &, rai::amount const &);
void rep_request (rai::endpoint const &);
// Should we reach out to this endpoint with a keepalive message
bool reachout (rai::endpoint const &);
size_t size ();
size_t size_sqrt ();
bool empty ();
@ -210,6 +218,15 @@ public:
boost::multi_index::ordered_non_unique <boost::multi_index::member <peer_information, rai::amount, &peer_information::rep_weight>, std::greater <rai::amount>>
>
> peers;
boost::multi_index_container
<
peer_attempt,
boost::multi_index::indexed_by
<
boost::multi_index::hashed_unique <boost::multi_index::member <peer_attempt, rai::endpoint, &peer_attempt::endpoint>>,
boost::multi_index::ordered_non_unique <boost::multi_index::member <peer_attempt, std::chrono::system_clock::time_point, &peer_attempt::last_attempt>>
>
> attempts;
// Called when a new peer is observed
std::function <void (rai::endpoint const &)> peer_observer;
std::function <void ()> disconnect_observer;