From b8cde9f2867f73ad1ab7e209653a104b06ad9faa Mon Sep 17 00:00:00 2001 From: cryptocode Date: Wed, 6 Mar 2019 20:31:54 +0100 Subject: [PATCH] Improve representative crawler (#1803) * Improve rep crawling * Add count-check to test * Move tags and template alias into class, improve modification of items via non-unique index --- nano/core_test/node.cpp | 36 ++++- nano/core_test/peer_container.cpp | 21 --- nano/node/CMakeLists.txt | 2 + nano/node/node.cpp | 159 ++++++-------------- nano/node/node.hpp | 18 +-- nano/node/peers.cpp | 100 +------------ nano/node/peers.hpp | 15 +- nano/node/repcrawler.cpp | 240 ++++++++++++++++++++++++++++++ nano/node/repcrawler.hpp | 134 +++++++++++++++++ nano/node/rpc.cpp | 10 +- 10 files changed, 464 insertions(+), 271 deletions(-) create mode 100644 nano/node/repcrawler.cpp create mode 100644 nano/node/repcrawler.hpp diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 8a619c80f..d3014cd2c 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1213,7 +1213,7 @@ TEST (node, DISABLED_fork_stale) auto & node1 (*system1.nodes[0]); auto & node2 (*system2.nodes[0]); node2.bootstrap_initiator.bootstrap (node1.network.endpoint ()); - node2.peers.rep_response (node1.network.endpoint (), nano::test_genesis_key.pub, nano::genesis_amount); + node2.rep_crawler.response (node1.network.endpoint (), nano::test_genesis_key.pub, nano::genesis_amount); nano::genesis genesis; nano::keypair key1; nano::keypair key2; @@ -1501,17 +1501,17 @@ TEST (node, rep_list) nano::keypair key1; // Broadcast a confirm so others should know this is a rep node wallet0->send_action (nano::test_genesis_key.pub, key1.pub, nano::Mxrb_ratio); - ASSERT_EQ (0, node1.peers.representatives (1).size ()); + ASSERT_EQ (0, node1.rep_crawler.representatives (1).size ()); system.deadline_set (10s); auto done (false); while (!done) { - auto reps (node1.peers.representatives (1)); + auto reps (node1.rep_crawler.representatives (1)); if (!reps.empty ()) { if (reps[0].endpoint == node0.network.endpoint ()) { - if (!reps[0].rep_weight.is_zero ()) + if (!reps[0].weight.is_zero ()) { done = true; } @@ -1521,6 +1521,34 @@ TEST (node, rep_list) } } +TEST (node, rep_weight) +{ + nano::system system (24000, 1); + auto & node (*system.nodes[0]); + + node.peers.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), 24001), 0); + ASSERT_TRUE (node.rep_crawler.representatives (1).empty ()); + nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24000); + nano::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 24002); + nano::endpoint endpoint2 (boost::asio::ip::address_v6::loopback (), 24003); + nano::amount amount100 (100); + nano::amount amount50 (50); + node.peers.insert (endpoint2, nano::protocol_version); + node.peers.insert (endpoint0, nano::protocol_version); + node.peers.insert (endpoint1, nano::protocol_version); + nano::keypair keypair1; + nano::keypair keypair2; + node.rep_crawler.response (endpoint0, keypair1.pub, amount100); + node.rep_crawler.response (endpoint1, keypair2.pub, amount50); + ASSERT_EQ (2, node.rep_crawler.representative_count ()); + // Make sure we get the rep with the most weight first + auto reps (node.rep_crawler.representatives (1)); + ASSERT_EQ (1, reps.size ()); + ASSERT_EQ (100, reps[0].weight.number ()); + ASSERT_EQ (keypair1.pub, reps[0].account); + ASSERT_EQ (endpoint0, reps[0].endpoint); +} + // Test that nodes can disable representative voting TEST (node, no_voting) { diff --git a/nano/core_test/peer_container.cpp b/nano/core_test/peer_container.cpp index 20bd487cb..94c0ed5c9 100644 --- a/nano/core_test/peer_container.cpp +++ b/nano/core_test/peer_container.cpp @@ -121,27 +121,6 @@ TEST (peer_container, list_fanout) ASSERT_EQ (32, list2.size ()); } -TEST (peer_container, rep_weight) -{ - nano::peer_container peers (nano::endpoint{}); - peers.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), 24001), 0); - ASSERT_TRUE (peers.representatives (1).empty ()); - nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24000); - nano::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 24002); - nano::endpoint endpoint2 (boost::asio::ip::address_v6::loopback (), 24003); - nano::amount amount (100); - peers.insert (endpoint2, nano::protocol_version); - peers.insert (endpoint0, nano::protocol_version); - peers.insert (endpoint1, nano::protocol_version); - nano::keypair keypair; - peers.rep_response (endpoint0, keypair.pub, amount); - auto reps (peers.representatives (1)); - ASSERT_EQ (1, reps.size ()); - ASSERT_EQ (100, reps[0].rep_weight.number ()); - ASSERT_EQ (keypair.pub, reps[0].probable_rep_account); - ASSERT_EQ (endpoint0, reps[0].endpoint); -} - // Test to make sure we don't repeatedly send keepalive messages to nodes that aren't responding TEST (peer_container, reachout) { diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 2c7d10366..e0f8f19ce 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -45,6 +45,8 @@ add_library (node peers.hpp portmapping.hpp portmapping.cpp + repcrawler.hpp + repcrawler.cpp rpc.hpp rpc.cpp rpcconfig.hpp diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 657c3e797..4dd5d375a 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -420,11 +420,16 @@ void nano::network::republish_vote (std::shared_ptr vote_a) void nano::network::broadcast_confirm_req (std::shared_ptr block_a) { - auto list (std::make_shared> (node.peers.representatives (std::numeric_limits::max ()))); - if (list->empty () || node.peers.total_weight () < node.config.online_weight_minimum.number ()) + auto list (std::make_shared> (node.rep_crawler.representative_endpoints (std::numeric_limits::max ()))); + if (list->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ()) { // broadcast request to all peers (with max limit 2 * sqrt (peers count)) - list = std::make_shared> (node.peers.list_vector (std::min (static_cast (100), 2 * node.peers.size_sqrt ()))); + auto peers (node.peers.list_vector (std::min (static_cast (100), 2 * node.peers.size_sqrt ()))); + list->clear (); + for (auto & peer : peers) + { + list->push_back (peer.endpoint); + } } /* @@ -444,7 +449,7 @@ void nano::network::broadcast_confirm_req (std::shared_ptr block_a) broadcast_confirm_req_base (block_a, list, 0); } -void nano::network::broadcast_confirm_req_base (std::shared_ptr block_a, std::shared_ptr> endpoints_a, unsigned delay_a, bool resumption) +void nano::network::broadcast_confirm_req_base (std::shared_ptr block_a, std::shared_ptr> endpoints_a, unsigned delay_a, bool resumption) { const size_t max_reps = 10; if (!resumption && node.config.logging.network_logging ()) @@ -454,7 +459,7 @@ void nano::network::broadcast_confirm_req_base (std::shared_ptr blo auto count (0); while (!endpoints_a->empty () && count < max_reps) { - send_confirm_req (endpoints_a->back ().endpoint, block_a); + send_confirm_req (endpoints_a->back (), block_a); endpoints_a->pop_back (); count++; } @@ -509,7 +514,7 @@ void nano::network::broadcast_confirm_req_batch (std::unordered_map, std::shared_ptr>>> deque_a, unsigned delay_a) +void nano::network::broadcast_confirm_req_batch (std::deque, std::shared_ptr>>> deque_a, unsigned delay_a) { auto pair (deque_a.front ()); deque_a.pop_front (); @@ -576,34 +581,6 @@ void nano::network::send_confirm_req_hashes (nano::endpoint const & endpoint_a, }); } -template -void rep_query (nano::node & node_a, T const & peers_a) -{ - auto transaction (node_a.store.tx_begin_read ()); - std::shared_ptr block (node_a.store.block_random (transaction)); - auto hash (block->hash ()); - node_a.rep_crawler.add (hash); - for (auto i (peers_a.begin ()), n (peers_a.end ()); i != n; ++i) - { - node_a.peers.rep_request (*i); - node_a.network.send_confirm_req (*i, block); - } - std::weak_ptr node_w (node_a.shared ()); - node_a.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash]() { - if (auto node_l = node_w.lock ()) - { - node_l->rep_crawler.remove (hash); - } - }); -} - -void rep_query (nano::node & node_a, nano::endpoint const & peers_a) -{ - std::array peers; - peers[0] = peers_a; - rep_query (node_a, peers); -} - namespace { class network_message_visitor : public nano::message_visitor @@ -1320,33 +1297,12 @@ std::unique_ptr collect_seq_con_info (vote_processor & v composite->add_component (std::make_unique (seq_con_info{ "representatives_3", representatives_3_count, sizeof (decltype (vote_processor.representatives_3)::value_type) })); return composite; } -} -void nano::rep_crawler::add (nano::block_hash const & hash_a) -{ - std::lock_guard lock (mutex); - active.insert (hash_a); -} - -void nano::rep_crawler::remove (nano::block_hash const & hash_a) -{ - std::lock_guard lock (mutex); - active.erase (hash_a); -} - -bool nano::rep_crawler::exists (nano::block_hash const & hash_a) -{ - std::lock_guard lock (mutex); - return active.count (hash_a) != 0; -} - -namespace nano -{ std::unique_ptr collect_seq_con_info (rep_crawler & rep_crawler, const std::string & name) { size_t count = 0; { - std::lock_guard guard (rep_crawler.mutex); + std::lock_guard guard (rep_crawler.active_mutex); count = rep_crawler.active.size (); } @@ -1355,10 +1311,7 @@ std::unique_ptr collect_seq_con_info (rep_crawler & rep_ composite->add_component (std::make_unique (seq_con_info{ "active", count, sizeof_element })); return composite; } -} -namespace nano -{ std::unique_ptr collect_seq_con_info (block_processor & block_processor, const std::string & name) { size_t state_blocks_count = 0; @@ -1415,6 +1368,7 @@ wallets (init_a.wallet_init, *this), port_mapping (*this), checker (config.signature_checker_threads), vote_processor (*this), +rep_crawler (*this), warmed_up (0), block_processor (*this), block_processor_thread ([this]() { @@ -1498,7 +1452,6 @@ startup_time (std::chrono::steady_clock::now ()) } observers.endpoint.add ([this](nano::endpoint const & endpoint_a) { this->network.send_keepalive (endpoint_a); - rep_query (*this, endpoint_a); }); observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr vote_a, nano::endpoint const & endpoint_a) { assert (endpoint_a.address ().is_v6 ()); @@ -1524,7 +1477,7 @@ startup_time (std::chrono::steady_clock::now ()) if (rep_crawler_exists) { // We see a valid non-replay vote for a block we requested, this node is probably a representative - if (this->peers.rep_response (endpoint_a, vote_a->account, rep_weight)) + if (this->rep_crawler.response (endpoint_a, vote_a->account, rep_weight)) { logger.try_log (boost::str (boost::format ("Found a representative at %1%") % endpoint_a)); // Rebroadcasting all active votes to new representative @@ -1572,7 +1525,6 @@ startup_time (std::chrono::steady_clock::now ()) node_id = nano::keypair (store.get_node_id (transaction)); logger.always_log ("Node ID: ", node_id.pub.to_account ()); } - peers.online_weight_minimum = config.online_weight_minimum.number (); if (nano::is_live_network || nano::is_beta_network) { nano::bufferstream weight_stream ((const uint8_t *)nano_bootstrap_weights, nano_bootstrap_weights_size); @@ -1915,7 +1867,7 @@ void nano::node::start () ongoing_unchecked_cleanup (); } ongoing_store_flush (); - ongoing_rep_crawl (); + rep_crawler.start (); ongoing_rep_calculation (); ongoing_peer_store (); ongoing_online_weight_calculation_queue (); @@ -2039,23 +1991,6 @@ void nano::node::ongoing_syn_cookie_cleanup () }); } -void nano::node::ongoing_rep_crawl () -{ - auto now (std::chrono::steady_clock::now ()); - auto peers_l (peers.rep_crawl ()); - rep_query (*this, peers_l); - if (network.on) - { - std::weak_ptr node_w (shared_from_this ()); - alarm.add (now + std::chrono::seconds (4), [node_w]() { - if (auto node_l = node_w.lock ()) - { - node_l->ongoing_rep_crawl (); - } - }); - } -} - void nano::node::ongoing_rep_calculation () { auto now (std::chrono::steady_clock::now ()); @@ -2556,6 +2491,7 @@ void nano::node::add_initial_peers () if (!peers.reachout (endpoint, config.allow_local_peers)) { send_keepalive (endpoint); + rep_crawler.query (endpoint); } } } @@ -3300,7 +3236,7 @@ void nano::active_transactions::request_confirm (std::unique_lock & unsigned unconfirmed_announcements (0); std::unordered_map>> requests_bundle; std::deque> rebroadcast_bundle; - std::deque, std::shared_ptr>>> confirm_req_bundle; + std::deque, std::shared_ptr>>> confirm_req_bundle; auto roots_size (roots.size ()); for (auto i (roots.get<1> ().begin ()), n (roots.get<1> ().end ()); i != n; ++i) @@ -3382,55 +3318,42 @@ void nano::active_transactions::request_confirm (std::unique_lock & } if (election_l->announcements % 4 == 1) { - auto reps (std::make_shared> (node.peers.representatives (std::numeric_limits::max ()))); - std::unordered_set probable_reps; - nano::uint128_t total_weight (0); - for (auto j (reps->begin ()), m (reps->end ()); j != m;) - { - auto & rep_votes (election_l->last_votes); - auto rep_acct (j->probable_rep_account); - // Calculate if representative isn't recorded for several IP addresses - if (probable_reps.find (rep_acct) == probable_reps.end ()) - { - total_weight = total_weight + j->rep_weight.number (); - probable_reps.insert (rep_acct); - } - if (rep_votes.find (rep_acct) != rep_votes.end ()) - { - if (j + 1 == reps->end ()) - { - reps->pop_back (); - break; - } + auto rep_endpoints (std::make_shared> ()); + auto reps (node.rep_crawler.representatives (std::numeric_limits::max ())); - std::swap (*j, reps->back ()); - reps->pop_back (); - m = reps->end (); - } - else + // Add all rep endpoints that haven't already voted. We use a set since multiple + // reps may exist on an endpoint. + std::unordered_set endpoints; + for (auto & rep : reps) + { + if (election_l->last_votes.find (rep.account) == election_l->last_votes.end ()) { - ++j; + endpoints.insert (rep.endpoint); + if (node.config.logging.vote_logging ()) { - node.logger.try_log ("Representative did not respond to confirm_req, retrying: ", rep_acct.to_account ()); + node.logger.try_log ("Representative did not respond to confirm_req, retrying: ", rep.account.to_account ()); } } } - if ((!reps->empty () && total_weight > node.config.online_weight_minimum.number ()) || roots_size > 5) + + rep_endpoints->insert (rep_endpoints->end (), endpoints.begin (), endpoints.end ()); + + if ((!rep_endpoints->empty () && node.rep_crawler.total_weight () > node.config.online_weight_minimum.number ()) || roots_size > 5) { // broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing if (!nano::is_test_network) { if (confirm_req_bundle.size () < max_broadcast_queue) { - confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, reps)); + confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, rep_endpoints)); } } else { - for (auto & rep : *reps) + for (auto & rep : *rep_endpoints) { - auto rep_request (requests_bundle.find (rep.endpoint)); + auto rep_request (requests_bundle.find (rep)); auto block (election_l->status.winner); auto root_hash (std::make_pair (block->hash (), block->root ())); if (rep_request == requests_bundle.end ()) @@ -3438,7 +3361,7 @@ void nano::active_transactions::request_confirm (std::unique_lock & if (requests_bundle.size () < max_broadcast_queue) { std::vector> insert_vector = { root_hash }; - requests_bundle.insert (std::make_pair (rep.endpoint, insert_vector)); + requests_bundle.insert (std::make_pair (rep, insert_vector)); } } else if (rep_request->second.size () < max_broadcast_queue * nano::network::confirm_req_hashes_max) @@ -3452,19 +3375,21 @@ void nano::active_transactions::request_confirm (std::unique_lock & { if (!nano::is_test_network) { - confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, std::make_shared> (node.peers.list_vector (100)))); + auto deque_l (node.peers.list (100)); + std::vector vec ({ deque_l.begin (), deque_l.end () }); + confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, std::make_shared> (vec))); } else { - for (auto & rep : *reps) + for (auto & rep : *rep_endpoints) { - auto rep_request (requests_bundle.find (rep.endpoint)); + auto rep_request (requests_bundle.find (rep)); auto block (election_l->status.winner); auto root_hash (std::make_pair (block->hash (), block->root ())); if (rep_request == requests_bundle.end ()) { std::vector> insert_vector = { root_hash }; - requests_bundle.insert (std::make_pair (rep.endpoint, insert_vector)); + requests_bundle.insert (std::make_pair (rep, insert_vector)); } else { diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 39ce83851..a6d09ada7 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -342,9 +343,9 @@ public: void send_keepalive (nano::endpoint const &); void send_node_id_handshake (nano::endpoint const &, boost::optional const & query, boost::optional const & respond_to); void broadcast_confirm_req (std::shared_ptr); - void broadcast_confirm_req_base (std::shared_ptr, std::shared_ptr>, unsigned, bool = false); + void broadcast_confirm_req_base (std::shared_ptr, std::shared_ptr>, unsigned, bool = false); void broadcast_confirm_req_batch (std::unordered_map>>, unsigned = broadcast_interval_ms, bool = false); - void broadcast_confirm_req_batch (std::deque, std::shared_ptr>>>, unsigned = broadcast_interval_ms); + void broadcast_confirm_req_batch (std::deque, std::shared_ptr>>>, unsigned = broadcast_interval_ms); void send_confirm_req (nano::endpoint const &, std::shared_ptr); void send_confirm_req_hashes (nano::endpoint const &, std::vector> const &); void confirm_hashes (nano::transaction const &, nano::endpoint const &, std::vector); @@ -416,18 +417,6 @@ private: }; std::unique_ptr collect_seq_con_info (vote_processor & vote_processor, const std::string & name); - -// The network is crawled for representatives by occasionally sending a unicast confirm_req for a specific block and watching to see if it's acknowledged with a vote. -class rep_crawler -{ -public: - void add (nano::block_hash const &); - void remove (nano::block_hash const &); - bool exists (nano::block_hash const &); - std::mutex mutex; - std::unordered_set active; -}; - std::unique_ptr collect_seq_con_info (rep_crawler & rep_crawler, const std::string & name); std::unique_ptr collect_seq_con_info (block_processor & block_processor, const std::string & name); @@ -462,7 +451,6 @@ public: nano::account representative (nano::account const &); void ongoing_keepalive (); void ongoing_syn_cookie_cleanup (); - void ongoing_rep_crawl (); void ongoing_rep_calculation (); void ongoing_bootstrap (); void ongoing_store_flush (); diff --git a/nano/node/peers.cpp b/nano/node/peers.cpp index 24cc3943a..00b526ad4 100644 --- a/nano/node/peers.cpp +++ b/nano/node/peers.cpp @@ -90,7 +90,7 @@ std::deque nano::peer_container::list_fanout () return result; } -std::deque nano::peer_container::list () +std::deque nano::peer_container::list (size_t count_a) { std::deque result; std::lock_guard lock (mutex); @@ -99,6 +99,10 @@ std::deque nano::peer_container::list () result.push_back (i->endpoint); } nano::random_pool::shuffle (result.begin (), result.end ()); + if (result.size () > count_a) + { + result.resize (count_a); + } return result; } @@ -229,22 +233,6 @@ void nano::peer_container::random_fill (std::array & target_a } } -// Request a list of the top known representatives -std::vector nano::peer_container::representatives (size_t count_a) -{ - std::vector result; - result.reserve (std::min (count_a, size_t (16))); - std::lock_guard lock (mutex); - for (auto i (peers.get<6> ().begin ()), n (peers.get<6> ().end ()); i != n && result.size () < count_a; ++i) - { - if (!i->rep_weight.is_zero ()) - { - result.push_back (*i); - } - } - return result; -} - void nano::peer_container::purge_syn_cookies (std::chrono::steady_clock::time_point const & cutoff) { std::lock_guard lock (syn_cookie_mutex); @@ -297,21 +285,6 @@ std::vector nano::peer_container::purge_list (std::chron return result; } -std::vector nano::peer_container::rep_crawl () -{ - std::vector result; - // If there is enough observed peers weight, crawl 10 peers. Otherwise - 40 - uint16_t max_count = (total_weight () > online_weight_minimum) ? 10 : 40; - result.reserve (max_count); - std::lock_guard lock (mutex); - uint16_t count (0); - for (auto i (peers.get<5> ().begin ()), n (peers.get<5> ().end ()); i != n && count < max_count; ++i, ++count) - { - result.push_back (i->endpoint); - }; - return result; -} - size_t nano::peer_container::size () { std::lock_guard lock (mutex); @@ -323,36 +296,6 @@ size_t nano::peer_container::size_sqrt () return (static_cast (std::ceil (std::sqrt (size ())))); } -std::vector nano::peer_container::list_probable_rep_weights () -{ - std::vector result; - std::unordered_set probable_reps; - std::lock_guard lock (mutex); - for (auto i (peers.get<6> ().begin ()), n (peers.get<6> ().end ()); i != n; ++i) - { - // Calculate if representative isn't recorded for several IP addresses - if (probable_reps.find (i->probable_rep_account) == probable_reps.end ()) - { - if (!i->rep_weight.number ().is_zero ()) - { - result.push_back (*i); - } - probable_reps.insert (i->probable_rep_account); - } - } - return result; -} - -nano::uint128_t nano::peer_container::total_weight () -{ - nano::uint128_t result (0); - for (auto & entry : list_probable_rep_weights ()) - { - result = result + entry.rep_weight.number (); - } - return result; -} - bool nano::peer_container::empty () { return size () == 0; @@ -376,39 +319,6 @@ bool nano::peer_container::not_a_peer (nano::endpoint const & endpoint_a, bool a return result; } -bool nano::peer_container::rep_response (nano::endpoint const & endpoint_a, nano::account const & rep_account_a, nano::amount const & weight_a) -{ - assert (endpoint_a.address ().is_v6 ()); - auto updated (false); - std::lock_guard lock (mutex); - auto existing (peers.find (endpoint_a)); - if (existing != peers.end ()) - { - peers.modify (existing, [weight_a, &updated, rep_account_a](nano::peer_information & info) { - info.last_rep_response = std::chrono::steady_clock::now (); - if (info.rep_weight < weight_a) - { - updated = true; - info.rep_weight = weight_a; - info.probable_rep_account = rep_account_a; - } - }); - } - return updated; -} - -void nano::peer_container::rep_request (nano::endpoint const & endpoint_a) -{ - std::lock_guard lock (mutex); - auto existing (peers.find (endpoint_a)); - if (existing != peers.end ()) - { - peers.modify (existing, [](nano::peer_information & info) { - info.last_rep_request = std::chrono::steady_clock::now (); - }); - } -} - bool nano::peer_container::reachout (nano::endpoint const & endpoint_a, bool allow_local_peers) { // Don't contact invalid IPs diff --git a/nano/node/peers.hpp b/nano/node/peers.hpp index 108f4774d..fc0f9ced8 100644 --- a/nano/node/peers.hpp +++ b/nano/node/peers.hpp @@ -51,10 +51,6 @@ public: std::chrono::steady_clock::time_point last_contact; std::chrono::steady_clock::time_point last_attempt; std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () }; - std::chrono::steady_clock::time_point last_rep_request{ std::chrono::steady_clock::time_point () }; - std::chrono::steady_clock::time_point last_rep_response{ std::chrono::steady_clock::time_point () }; - nano::amount rep_weight{ 0 }; - nano::account probable_rep_account{ 0 }; unsigned network_version{ nano::protocol_version }; boost::optional node_id; bool operator< (nano::peer_information const &) const; @@ -76,10 +72,8 @@ public: bool insert (nano::endpoint const &, unsigned, bool = false, boost::optional = boost::none); std::unordered_set random_set (size_t); void random_fill (std::array &); - // Request a list of the top known representatives - std::vector representatives (size_t); // List of all peers - std::deque list (); + std::deque list (size_t count_a = std::numeric_limits::max ()); std::vector list_vector (size_t); // A list of random peers sized for the configured rebroadcast fanout std::deque list_fanout (); @@ -90,9 +84,6 @@ public: // Purge any peer where last_contact < time_point and return what was left std::vector purge_list (std::chrono::steady_clock::time_point const &); void purge_syn_cookies (std::chrono::steady_clock::time_point const &); - std::vector rep_crawl (); - bool rep_response (nano::endpoint const &, nano::account const &, nano::amount const &); - void rep_request (nano::endpoint const &); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &, bool = false); // Returns boost::none if the IP is rate capped on syn cookie requests, @@ -103,8 +94,6 @@ public: bool validate_syn_cookie (nano::endpoint const &, nano::account, nano::signature); size_t size (); size_t size_sqrt (); - nano::uint128_t total_weight (); - nano::uint128_t online_weight_minimum; bool empty (); std::mutex mutex; nano::endpoint self; @@ -116,8 +105,6 @@ public: boost::multi_index::ordered_non_unique, std::greater>, boost::multi_index::random_access<>, boost::multi_index::ordered_non_unique>, - boost::multi_index::ordered_non_unique>, - boost::multi_index::ordered_non_unique, std::greater>, boost::multi_index::ordered_non_unique, boost::multi_index::member>>> peers; boost::multi_index_container< diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp new file mode 100644 index 000000000..9f8b2c4b7 --- /dev/null +++ b/nano/node/repcrawler.cpp @@ -0,0 +1,240 @@ +#include +#include + +nano::rep_crawler::rep_crawler (nano::node & node_a) : +node (node_a) +{ + node.observers.endpoint.add ([this](nano::endpoint const & endpoint_a) { + this->query (endpoint_a); + }); +} + +void nano::rep_crawler::add (nano::block_hash const & hash_a) +{ + std::lock_guard lock (active_mutex); + active.insert (hash_a); +} + +void nano::rep_crawler::remove (nano::block_hash const & hash_a) +{ + std::lock_guard lock (active_mutex); + active.erase (hash_a); +} + +bool nano::rep_crawler::exists (nano::block_hash const & hash_a) +{ + std::lock_guard lock (active_mutex); + return active.count (hash_a) != 0; +} + +void nano::rep_crawler::start () +{ + ongoing_crawl (); +} + +void nano::rep_crawler::ongoing_crawl () +{ + auto now (std::chrono::steady_clock::now ()); + query (get_crawl_targets ()); + if (node.network.on) + { + // Reduce crawl frequency when there's enough total peer weight + unsigned next_run_seconds = (total_weight_internal () > node.config.online_weight_minimum.number ()) ? 7 : 3; + std::weak_ptr node_w (node.shared ()); + node.alarm.add (now + std::chrono::seconds (next_run_seconds), [node_w, this]() { + if (auto node_l = node_w.lock ()) + { + this->ongoing_crawl (); + } + }); + } +} + +std::vector nano::rep_crawler::get_crawl_targets () +{ + std::unordered_set endpoints; + constexpr size_t conservative_count = 10; + constexpr size_t aggressive_count = 40; + + // Crawl more aggressively if we lack sufficient total peer weight. + bool sufficient_weight (total_weight_internal () > node.config.online_weight_minimum.number ()); + uint16_t required_peer_count = sufficient_weight ? conservative_count : aggressive_count; + std::lock_guard lock (probable_reps_mutex); + + // First, add known rep endpoints, ordered by ascending last-requested time. + for (auto i (probable_reps.get ().begin ()), n (probable_reps.get ().end ()); i != n && endpoints.size () < required_peer_count; ++i) + { + endpoints.insert (i->endpoint); + }; + + // Add additional random peers. We do this even if we have enough weight, in order to pick up reps + // that didn't respond when first observed. If the current total weight isn't sufficient, this + // will be more aggressive. When the node first starts, the rep container is empty and all + // endpoints will originate from random peers. + required_peer_count += required_peer_count / 2; + + // The rest of the endpoints are picked randomly + auto random_peers (node.peers.list ()); + for (auto & peer : random_peers) + { + endpoints.insert (peer); + if (endpoints.size () >= required_peer_count) + { + break; + } + } + + std::vector result; + result.insert (result.end (), endpoints.begin (), endpoints.end ()); + return result; +} + +void nano::rep_crawler::query (std::vector const & endpoints_a) +{ + auto transaction (node.store.tx_begin_read ()); + std::shared_ptr block (node.store.block_random (transaction)); + auto hash (block->hash ()); + add (hash); + for (auto i (endpoints_a.begin ()), n (endpoints_a.end ()); i != n; ++i) + { + on_rep_request (*i); + node.network.send_confirm_req (*i, block); + } + + // A representative must respond with a vote within the deadline + std::weak_ptr node_w (node.shared ()); + node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash]() { + if (auto node_l = node_w.lock ()) + { + node_l->rep_crawler.remove (hash); + } + }); +} + +void nano::rep_crawler::query (nano::endpoint const & endpoint_a) +{ + std::vector peers; + peers.push_back (endpoint_a); + query (peers); +} + +bool nano::rep_crawler::response (nano::endpoint const & endpoint_a, nano::account const & rep_account_a, nano::amount const & weight_a) +{ + assert (endpoint_a.address ().is_v6 ()); + auto updated (false); + std::lock_guard lock (probable_reps_mutex); + auto existing (probable_reps.find (rep_account_a)); + if (existing != probable_reps.end ()) + { + probable_reps.modify (existing, [weight_a, &updated, rep_account_a, endpoint_a](nano::representative & info) { + info.last_response = std::chrono::steady_clock::now (); + + if (info.weight < weight_a) + { + updated = true; + info.weight = weight_a; + info.endpoint = endpoint_a; + info.account = rep_account_a; + } + }); + } + else + { + probable_reps.insert (nano::representative (rep_account_a, weight_a, endpoint_a)); + } + return updated; +} + +nano::uint128_t nano::rep_crawler::total_weight_internal () +{ + nano::uint128_t result (0); + for (auto i (probable_reps.get ().begin ()), n (probable_reps.get ().end ()); i != n; ++i) + { + auto weight (i->weight.number ()); + if (weight > 0) + { + result = result + weight; + } + else + { + break; + } + } + return result; +} + +nano::uint128_t nano::rep_crawler::total_weight () +{ + std::lock_guard lock (probable_reps_mutex); + return total_weight_internal (); +} + +std::vector nano::rep_crawler::representatives_by_weight () +{ + std::vector result; + std::lock_guard lock (probable_reps_mutex); + for (auto i (probable_reps.get ().begin ()), n (probable_reps.get ().end ()); i != n; ++i) + { + auto weight (i->weight.number ()); + if (weight > 0) + { + result.push_back (*i); + } + else + { + break; + } + } + return result; +} + +void nano::rep_crawler::on_rep_request (nano::endpoint const & endpoint_a) +{ + std::lock_guard lock (probable_reps_mutex); + + using probable_rep_itr_t = probably_rep_t::index::type::iterator; + probably_rep_t::index::type & endpoint_index = probable_reps.get (); + + // Find and update the timestamp on all reps available on the endpoint (a single host may have multiple reps) + std::vector view; + auto itr_pair = probable_reps.get ().equal_range (endpoint_a); + for (; itr_pair.first != itr_pair.second; itr_pair.first++) + { + endpoint_index.modify (itr_pair.first, [](nano::representative & value_a) { + value_a.last_request = std::chrono::steady_clock::now (); + }); + } +} + +std::vector nano::rep_crawler::representatives (size_t count_a) +{ + std::vector result; + result.reserve (std::min (count_a, size_t (16))); + std::lock_guard lock (probable_reps_mutex); + for (auto i (probable_reps.get ().begin ()), n (probable_reps.get ().end ()); i != n && result.size () < count_a; ++i) + { + if (!i->weight.is_zero ()) + { + result.push_back (*i); + } + } + return result; +} + +std::vector nano::rep_crawler::representative_endpoints (size_t count_a) +{ + std::vector result; + auto reps (representatives (count_a)); + for (auto rep : reps) + { + result.push_back (rep.endpoint); + } + return result; +} + +/** Total number of representatives */ +size_t nano::rep_crawler::representative_count () +{ + std::lock_guard lock (probable_reps_mutex); + return probable_reps.size (); +} diff --git a/nano/node/repcrawler.hpp b/nano/node/repcrawler.hpp new file mode 100644 index 000000000..36f70e686 --- /dev/null +++ b/nano/node/repcrawler.hpp @@ -0,0 +1,134 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class node; + +/** + * A representative picked up during repcrawl. + */ +class representative +{ +public: + representative (nano::account account_a, nano::amount weight_a, nano::endpoint endpoint_a) : + account (account_a), weight (weight_a), endpoint (endpoint_a) + { + } + nano::account account{ 0 }; + nano::amount weight{ 0 }; + nano::endpoint endpoint; + std::chrono::steady_clock::time_point last_request{ std::chrono::steady_clock::time_point () }; + std::chrono::steady_clock::time_point last_response{ std::chrono::steady_clock::time_point () }; +}; + +/** + * Crawls the network for representatives. Queries are performed by requesting confirmation of a + * random block and observing the corresponding vote. + */ +class rep_crawler +{ + friend std::unique_ptr collect_seq_con_info (rep_crawler & rep_crawler, const std::string & name); + + // clang-format off + class tag_endpoint {}; + class tag_last_request {}; + class tag_account {}; + class tag_weight {}; + + using probably_rep_t = boost::multi_index_container>, + mi::random_access<>, + mi::ordered_non_unique, + mi::member>, + mi::ordered_non_unique, + mi::member, std::greater>, + mi::ordered_non_unique, + mi::member>>>; + // clang-format on + +public: + rep_crawler (nano::node & node_a); + + /** Start crawling */ + void start (); + + /** Add block hash to list of active rep queries */ + void add (nano::block_hash const &); + + /** Remove block hash from list of active rep queries */ + void remove (nano::block_hash const &); + + /** Check if block hash is in the list of active rep queries */ + bool exists (nano::block_hash const &); + + /** Attempt to determine if the peer manages one or more representative accounts */ + void query (std::vector const & endpoints_a); + + /** Attempt to determine if the peer manages one or more representative accounts */ + void query (nano::endpoint const & endpoint_a); + + /** + * Called when a non-replay vote on a block previously sent by query() is received. This indiciates + * with high probability that the endpoint is a representative node. + * @return True if the rep entry was updated with new information due to increase in weight. + */ + bool response (nano::endpoint const & endpoint_a, nano::account const & rep_account_a, nano::amount const & weight_a); + + /** Get total available weight from representatives */ + nano::uint128_t total_weight (); + + /** Request a list of the top \p count_a known representatives. The maximum number of reps returned is 16. */ + std::vector representatives (size_t count_a); + + /** Request a list of the top \p count_a known representative endpoints. The maximum number of reps returned is 16. */ + std::vector representative_endpoints (size_t count_a); + + /** Returns all representatives registered with weight in descending order */ + std::vector representatives_by_weight (); + + /** Total number of representatives */ + size_t representative_count (); + +private: + nano::node & node; + + /** Protects the active-hash container */ + std::mutex active_mutex; + + /** We have solicted votes for these random blocks */ + std::unordered_set active; + + /** Called continuously to crawl for representatives */ + void ongoing_crawl (); + + /** Returns a list of endpoints to crawl */ + std::vector get_crawl_targets (); + + /** When a rep request is made, this is called to update the last-request timestamp. */ + void on_rep_request (nano::endpoint const & endpoint_a); + + /** Protects the probable_reps container */ + std::mutex probable_reps_mutex; + + /** Get total available weight from representatives (must be called with a lock on probable_reps_mutex) */ + nano::uint128_t total_weight_internal (); + + /** Probable representatives */ + probably_rep_t probable_reps; +}; +} diff --git a/nano/node/rpc.cpp b/nano/node/rpc.cpp index 0c23b300d..a89f119ee 100644 --- a/nano/node/rpc.cpp +++ b/nano/node/rpc.cpp @@ -1688,16 +1688,16 @@ void nano::rpc_handler::confirmation_quorum () response_l.put ("online_weight_quorum_percent", std::to_string (node.config.online_weight_quorum)); response_l.put ("online_weight_minimum", node.config.online_weight_minimum.to_string_dec ()); response_l.put ("online_stake_total", node.online_reps.online_stake ().convert_to ()); - response_l.put ("peers_stake_total", node.peers.total_weight ().convert_to ()); + response_l.put ("peers_stake_total", node.rep_crawler.total_weight ().convert_to ()); if (request.get ("peer_details", false)) { boost::property_tree::ptree peers; - for (auto & peer : node.peers.list_probable_rep_weights ()) + for (auto & peer : node.rep_crawler.representatives_by_weight ()) { boost::property_tree::ptree peer_node; - peer_node.put ("account", peer.probable_rep_account.to_account ()); - peer_node.put ("ip", peer.ip_address.to_string ()); - peer_node.put ("weight", peer.rep_weight.to_string_dec ()); + peer_node.put ("account", peer.account.to_account ()); + peer_node.put ("ip", peer.endpoint.address ().to_string ()); + peer_node.put ("weight", peer.weight.to_string_dec ()); peers.push_back (std::make_pair ("", peer_node)); } response_l.add_child ("peers", peers);