Improve representative crawler (#1803)

* Improve rep crawling

* Add count-check to test

* Move tags and template alias into class, improve modification of items via non-unique index
This commit is contained in:
cryptocode 2019-03-06 20:31:54 +01:00 committed by GitHub
commit b8cde9f286
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 464 additions and 271 deletions

View file

@ -1213,7 +1213,7 @@ TEST (node, DISABLED_fork_stale)
auto & node1 (*system1.nodes[0]);
auto & node2 (*system2.nodes[0]);
node2.bootstrap_initiator.bootstrap (node1.network.endpoint ());
node2.peers.rep_response (node1.network.endpoint (), nano::test_genesis_key.pub, nano::genesis_amount);
node2.rep_crawler.response (node1.network.endpoint (), nano::test_genesis_key.pub, nano::genesis_amount);
nano::genesis genesis;
nano::keypair key1;
nano::keypair key2;
@ -1501,17 +1501,17 @@ TEST (node, rep_list)
nano::keypair key1;
// Broadcast a confirm so others should know this is a rep node
wallet0->send_action (nano::test_genesis_key.pub, key1.pub, nano::Mxrb_ratio);
ASSERT_EQ (0, node1.peers.representatives (1).size ());
ASSERT_EQ (0, node1.rep_crawler.representatives (1).size ());
system.deadline_set (10s);
auto done (false);
while (!done)
{
auto reps (node1.peers.representatives (1));
auto reps (node1.rep_crawler.representatives (1));
if (!reps.empty ())
{
if (reps[0].endpoint == node0.network.endpoint ())
{
if (!reps[0].rep_weight.is_zero ())
if (!reps[0].weight.is_zero ())
{
done = true;
}
@ -1521,6 +1521,34 @@ TEST (node, rep_list)
}
}
TEST (node, rep_weight)
{
nano::system system (24000, 1);
auto & node (*system.nodes[0]);
node.peers.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), 24001), 0);
ASSERT_TRUE (node.rep_crawler.representatives (1).empty ());
nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24000);
nano::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 24002);
nano::endpoint endpoint2 (boost::asio::ip::address_v6::loopback (), 24003);
nano::amount amount100 (100);
nano::amount amount50 (50);
node.peers.insert (endpoint2, nano::protocol_version);
node.peers.insert (endpoint0, nano::protocol_version);
node.peers.insert (endpoint1, nano::protocol_version);
nano::keypair keypair1;
nano::keypair keypair2;
node.rep_crawler.response (endpoint0, keypair1.pub, amount100);
node.rep_crawler.response (endpoint1, keypair2.pub, amount50);
ASSERT_EQ (2, node.rep_crawler.representative_count ());
// Make sure we get the rep with the most weight first
auto reps (node.rep_crawler.representatives (1));
ASSERT_EQ (1, reps.size ());
ASSERT_EQ (100, reps[0].weight.number ());
ASSERT_EQ (keypair1.pub, reps[0].account);
ASSERT_EQ (endpoint0, reps[0].endpoint);
}
// Test that nodes can disable representative voting
TEST (node, no_voting)
{

View file

@ -121,27 +121,6 @@ TEST (peer_container, list_fanout)
ASSERT_EQ (32, list2.size ());
}
TEST (peer_container, rep_weight)
{
nano::peer_container peers (nano::endpoint{});
peers.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), 24001), 0);
ASSERT_TRUE (peers.representatives (1).empty ());
nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), 24000);
nano::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), 24002);
nano::endpoint endpoint2 (boost::asio::ip::address_v6::loopback (), 24003);
nano::amount amount (100);
peers.insert (endpoint2, nano::protocol_version);
peers.insert (endpoint0, nano::protocol_version);
peers.insert (endpoint1, nano::protocol_version);
nano::keypair keypair;
peers.rep_response (endpoint0, keypair.pub, amount);
auto reps (peers.representatives (1));
ASSERT_EQ (1, reps.size ());
ASSERT_EQ (100, reps[0].rep_weight.number ());
ASSERT_EQ (keypair.pub, reps[0].probable_rep_account);
ASSERT_EQ (endpoint0, reps[0].endpoint);
}
// Test to make sure we don't repeatedly send keepalive messages to nodes that aren't responding
TEST (peer_container, reachout)
{

View file

@ -45,6 +45,8 @@ add_library (node
peers.hpp
portmapping.hpp
portmapping.cpp
repcrawler.hpp
repcrawler.cpp
rpc.hpp
rpc.cpp
rpcconfig.hpp

View file

@ -420,11 +420,16 @@ void nano::network::republish_vote (std::shared_ptr<nano::vote> vote_a)
void nano::network::broadcast_confirm_req (std::shared_ptr<nano::block> block_a)
{
auto list (std::make_shared<std::vector<nano::peer_information>> (node.peers.representatives (std::numeric_limits<size_t>::max ())));
if (list->empty () || node.peers.total_weight () < node.config.online_weight_minimum.number ())
auto list (std::make_shared<std::vector<nano::endpoint>> (node.rep_crawler.representative_endpoints (std::numeric_limits<size_t>::max ())));
if (list->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ())
{
// broadcast request to all peers (with max limit 2 * sqrt (peers count))
list = std::make_shared<std::vector<nano::peer_information>> (node.peers.list_vector (std::min (static_cast<size_t> (100), 2 * node.peers.size_sqrt ())));
auto peers (node.peers.list_vector (std::min (static_cast<size_t> (100), 2 * node.peers.size_sqrt ())));
list->clear ();
for (auto & peer : peers)
{
list->push_back (peer.endpoint);
}
}
/*
@ -444,7 +449,7 @@ void nano::network::broadcast_confirm_req (std::shared_ptr<nano::block> block_a)
broadcast_confirm_req_base (block_a, list, 0);
}
void nano::network::broadcast_confirm_req_base (std::shared_ptr<nano::block> block_a, std::shared_ptr<std::vector<nano::peer_information>> endpoints_a, unsigned delay_a, bool resumption)
void nano::network::broadcast_confirm_req_base (std::shared_ptr<nano::block> block_a, std::shared_ptr<std::vector<nano::endpoint>> endpoints_a, unsigned delay_a, bool resumption)
{
const size_t max_reps = 10;
if (!resumption && node.config.logging.network_logging ())
@ -454,7 +459,7 @@ void nano::network::broadcast_confirm_req_base (std::shared_ptr<nano::block> blo
auto count (0);
while (!endpoints_a->empty () && count < max_reps)
{
send_confirm_req (endpoints_a->back ().endpoint, block_a);
send_confirm_req (endpoints_a->back (), block_a);
endpoints_a->pop_back ();
count++;
}
@ -509,7 +514,7 @@ void nano::network::broadcast_confirm_req_batch (std::unordered_map<nano::endpoi
}
}
void nano::network::broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::peer_information>>>> deque_a, unsigned delay_a)
void nano::network::broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::endpoint>>>> deque_a, unsigned delay_a)
{
auto pair (deque_a.front ());
deque_a.pop_front ();
@ -576,34 +581,6 @@ void nano::network::send_confirm_req_hashes (nano::endpoint const & endpoint_a,
});
}
template <typename T>
void rep_query (nano::node & node_a, T const & peers_a)
{
auto transaction (node_a.store.tx_begin_read ());
std::shared_ptr<nano::block> block (node_a.store.block_random (transaction));
auto hash (block->hash ());
node_a.rep_crawler.add (hash);
for (auto i (peers_a.begin ()), n (peers_a.end ()); i != n; ++i)
{
node_a.peers.rep_request (*i);
node_a.network.send_confirm_req (*i, block);
}
std::weak_ptr<nano::node> node_w (node_a.shared ());
node_a.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash]() {
if (auto node_l = node_w.lock ())
{
node_l->rep_crawler.remove (hash);
}
});
}
void rep_query (nano::node & node_a, nano::endpoint const & peers_a)
{
std::array<nano::endpoint, 1> peers;
peers[0] = peers_a;
rep_query (node_a, peers);
}
namespace
{
class network_message_visitor : public nano::message_visitor
@ -1320,33 +1297,12 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (vote_processor & v
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "representatives_3", representatives_3_count, sizeof (decltype (vote_processor.representatives_3)::value_type) }));
return composite;
}
}
void nano::rep_crawler::add (nano::block_hash const & hash_a)
{
std::lock_guard<std::mutex> lock (mutex);
active.insert (hash_a);
}
void nano::rep_crawler::remove (nano::block_hash const & hash_a)
{
std::lock_guard<std::mutex> lock (mutex);
active.erase (hash_a);
}
bool nano::rep_crawler::exists (nano::block_hash const & hash_a)
{
std::lock_guard<std::mutex> lock (mutex);
return active.count (hash_a) != 0;
}
namespace nano
{
std::unique_ptr<seq_con_info_component> collect_seq_con_info (rep_crawler & rep_crawler, const std::string & name)
{
size_t count = 0;
{
std::lock_guard<std::mutex> guard (rep_crawler.mutex);
std::lock_guard<std::mutex> guard (rep_crawler.active_mutex);
count = rep_crawler.active.size ();
}
@ -1355,10 +1311,7 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (rep_crawler & rep_
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "active", count, sizeof_element }));
return composite;
}
}
namespace nano
{
std::unique_ptr<seq_con_info_component> collect_seq_con_info (block_processor & block_processor, const std::string & name)
{
size_t state_blocks_count = 0;
@ -1415,6 +1368,7 @@ wallets (init_a.wallet_init, *this),
port_mapping (*this),
checker (config.signature_checker_threads),
vote_processor (*this),
rep_crawler (*this),
warmed_up (0),
block_processor (*this),
block_processor_thread ([this]() {
@ -1498,7 +1452,6 @@ startup_time (std::chrono::steady_clock::now ())
}
observers.endpoint.add ([this](nano::endpoint const & endpoint_a) {
this->network.send_keepalive (endpoint_a);
rep_query (*this, endpoint_a);
});
observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr<nano::vote> vote_a, nano::endpoint const & endpoint_a) {
assert (endpoint_a.address ().is_v6 ());
@ -1524,7 +1477,7 @@ startup_time (std::chrono::steady_clock::now ())
if (rep_crawler_exists)
{
// We see a valid non-replay vote for a block we requested, this node is probably a representative
if (this->peers.rep_response (endpoint_a, vote_a->account, rep_weight))
if (this->rep_crawler.response (endpoint_a, vote_a->account, rep_weight))
{
logger.try_log (boost::str (boost::format ("Found a representative at %1%") % endpoint_a));
// Rebroadcasting all active votes to new representative
@ -1572,7 +1525,6 @@ startup_time (std::chrono::steady_clock::now ())
node_id = nano::keypair (store.get_node_id (transaction));
logger.always_log ("Node ID: ", node_id.pub.to_account ());
}
peers.online_weight_minimum = config.online_weight_minimum.number ();
if (nano::is_live_network || nano::is_beta_network)
{
nano::bufferstream weight_stream ((const uint8_t *)nano_bootstrap_weights, nano_bootstrap_weights_size);
@ -1915,7 +1867,7 @@ void nano::node::start ()
ongoing_unchecked_cleanup ();
}
ongoing_store_flush ();
ongoing_rep_crawl ();
rep_crawler.start ();
ongoing_rep_calculation ();
ongoing_peer_store ();
ongoing_online_weight_calculation_queue ();
@ -2039,23 +1991,6 @@ void nano::node::ongoing_syn_cookie_cleanup ()
});
}
void nano::node::ongoing_rep_crawl ()
{
auto now (std::chrono::steady_clock::now ());
auto peers_l (peers.rep_crawl ());
rep_query (*this, peers_l);
if (network.on)
{
std::weak_ptr<nano::node> node_w (shared_from_this ());
alarm.add (now + std::chrono::seconds (4), [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_rep_crawl ();
}
});
}
}
void nano::node::ongoing_rep_calculation ()
{
auto now (std::chrono::steady_clock::now ());
@ -2556,6 +2491,7 @@ void nano::node::add_initial_peers ()
if (!peers.reachout (endpoint, config.allow_local_peers))
{
send_keepalive (endpoint);
rep_crawler.query (endpoint);
}
}
}
@ -3300,7 +3236,7 @@ void nano::active_transactions::request_confirm (std::unique_lock<std::mutex> &
unsigned unconfirmed_announcements (0);
std::unordered_map<nano::endpoint, std::vector<std::pair<nano::block_hash, nano::block_hash>>> requests_bundle;
std::deque<std::shared_ptr<nano::block>> rebroadcast_bundle;
std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::peer_information>>>> confirm_req_bundle;
std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::endpoint>>>> confirm_req_bundle;
auto roots_size (roots.size ());
for (auto i (roots.get<1> ().begin ()), n (roots.get<1> ().end ()); i != n; ++i)
@ -3382,55 +3318,42 @@ void nano::active_transactions::request_confirm (std::unique_lock<std::mutex> &
}
if (election_l->announcements % 4 == 1)
{
auto reps (std::make_shared<std::vector<nano::peer_information>> (node.peers.representatives (std::numeric_limits<size_t>::max ())));
std::unordered_set<nano::account> probable_reps;
nano::uint128_t total_weight (0);
for (auto j (reps->begin ()), m (reps->end ()); j != m;)
{
auto & rep_votes (election_l->last_votes);
auto rep_acct (j->probable_rep_account);
// Calculate if representative isn't recorded for several IP addresses
if (probable_reps.find (rep_acct) == probable_reps.end ())
{
total_weight = total_weight + j->rep_weight.number ();
probable_reps.insert (rep_acct);
}
if (rep_votes.find (rep_acct) != rep_votes.end ())
{
if (j + 1 == reps->end ())
{
reps->pop_back ();
break;
}
auto rep_endpoints (std::make_shared<std::vector<nano::endpoint>> ());
auto reps (node.rep_crawler.representatives (std::numeric_limits<size_t>::max ()));
std::swap (*j, reps->back ());
reps->pop_back ();
m = reps->end ();
}
else
// Add all rep endpoints that haven't already voted. We use a set since multiple
// reps may exist on an endpoint.
std::unordered_set<nano::endpoint> endpoints;
for (auto & rep : reps)
{
if (election_l->last_votes.find (rep.account) == election_l->last_votes.end ())
{
++j;
endpoints.insert (rep.endpoint);
if (node.config.logging.vote_logging ())
{
node.logger.try_log ("Representative did not respond to confirm_req, retrying: ", rep_acct.to_account ());
node.logger.try_log ("Representative did not respond to confirm_req, retrying: ", rep.account.to_account ());
}
}
}
if ((!reps->empty () && total_weight > node.config.online_weight_minimum.number ()) || roots_size > 5)
rep_endpoints->insert (rep_endpoints->end (), endpoints.begin (), endpoints.end ());
if ((!rep_endpoints->empty () && node.rep_crawler.total_weight () > node.config.online_weight_minimum.number ()) || roots_size > 5)
{
// broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing
if (!nano::is_test_network)
{
if (confirm_req_bundle.size () < max_broadcast_queue)
{
confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, reps));
confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, rep_endpoints));
}
}
else
{
for (auto & rep : *reps)
for (auto & rep : *rep_endpoints)
{
auto rep_request (requests_bundle.find (rep.endpoint));
auto rep_request (requests_bundle.find (rep));
auto block (election_l->status.winner);
auto root_hash (std::make_pair (block->hash (), block->root ()));
if (rep_request == requests_bundle.end ())
@ -3438,7 +3361,7 @@ void nano::active_transactions::request_confirm (std::unique_lock<std::mutex> &
if (requests_bundle.size () < max_broadcast_queue)
{
std::vector<std::pair<nano::block_hash, nano::block_hash>> insert_vector = { root_hash };
requests_bundle.insert (std::make_pair (rep.endpoint, insert_vector));
requests_bundle.insert (std::make_pair (rep, insert_vector));
}
}
else if (rep_request->second.size () < max_broadcast_queue * nano::network::confirm_req_hashes_max)
@ -3452,19 +3375,21 @@ void nano::active_transactions::request_confirm (std::unique_lock<std::mutex> &
{
if (!nano::is_test_network)
{
confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, std::make_shared<std::vector<nano::peer_information>> (node.peers.list_vector (100))));
auto deque_l (node.peers.list (100));
std::vector<nano::endpoint> vec ({ deque_l.begin (), deque_l.end () });
confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, std::make_shared<std::vector<nano::endpoint>> (vec)));
}
else
{
for (auto & rep : *reps)
for (auto & rep : *rep_endpoints)
{
auto rep_request (requests_bundle.find (rep.endpoint));
auto rep_request (requests_bundle.find (rep));
auto block (election_l->status.winner);
auto root_hash (std::make_pair (block->hash (), block->root ()));
if (rep_request == requests_bundle.end ())
{
std::vector<std::pair<nano::block_hash, nano::block_hash>> insert_vector = { root_hash };
requests_bundle.insert (std::make_pair (rep.endpoint, insert_vector));
requests_bundle.insert (std::make_pair (rep, insert_vector));
}
else
{

View file

@ -7,6 +7,7 @@
#include <nano/node/nodeconfig.hpp>
#include <nano/node/peers.hpp>
#include <nano/node/portmapping.hpp>
#include <nano/node/repcrawler.hpp>
#include <nano/node/signatures.hpp>
#include <nano/node/stats.hpp>
#include <nano/node/wallet.hpp>
@ -342,9 +343,9 @@ public:
void send_keepalive (nano::endpoint const &);
void send_node_id_handshake (nano::endpoint const &, boost::optional<nano::uint256_union> const & query, boost::optional<nano::uint256_union> const & respond_to);
void broadcast_confirm_req (std::shared_ptr<nano::block>);
void broadcast_confirm_req_base (std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::peer_information>>, unsigned, bool = false);
void broadcast_confirm_req_base (std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::endpoint>>, unsigned, bool = false);
void broadcast_confirm_req_batch (std::unordered_map<nano::endpoint, std::vector<std::pair<nano::block_hash, nano::block_hash>>>, unsigned = broadcast_interval_ms, bool = false);
void broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::peer_information>>>>, unsigned = broadcast_interval_ms);
void broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<nano::endpoint>>>>, unsigned = broadcast_interval_ms);
void send_confirm_req (nano::endpoint const &, std::shared_ptr<nano::block>);
void send_confirm_req_hashes (nano::endpoint const &, std::vector<std::pair<nano::block_hash, nano::block_hash>> const &);
void confirm_hashes (nano::transaction const &, nano::endpoint const &, std::vector<nano::block_hash>);
@ -416,18 +417,6 @@ private:
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (vote_processor & vote_processor, const std::string & name);
// The network is crawled for representatives by occasionally sending a unicast confirm_req for a specific block and watching to see if it's acknowledged with a vote.
class rep_crawler
{
public:
void add (nano::block_hash const &);
void remove (nano::block_hash const &);
bool exists (nano::block_hash const &);
std::mutex mutex;
std::unordered_set<nano::block_hash> active;
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (rep_crawler & rep_crawler, const std::string & name);
std::unique_ptr<seq_con_info_component> collect_seq_con_info (block_processor & block_processor, const std::string & name);
@ -462,7 +451,6 @@ public:
nano::account representative (nano::account const &);
void ongoing_keepalive ();
void ongoing_syn_cookie_cleanup ();
void ongoing_rep_crawl ();
void ongoing_rep_calculation ();
void ongoing_bootstrap ();
void ongoing_store_flush ();

View file

@ -90,7 +90,7 @@ std::deque<nano::endpoint> nano::peer_container::list_fanout ()
return result;
}
std::deque<nano::endpoint> nano::peer_container::list ()
std::deque<nano::endpoint> nano::peer_container::list (size_t count_a)
{
std::deque<nano::endpoint> result;
std::lock_guard<std::mutex> lock (mutex);
@ -99,6 +99,10 @@ std::deque<nano::endpoint> nano::peer_container::list ()
result.push_back (i->endpoint);
}
nano::random_pool::shuffle (result.begin (), result.end ());
if (result.size () > count_a)
{
result.resize (count_a);
}
return result;
}
@ -229,22 +233,6 @@ void nano::peer_container::random_fill (std::array<nano::endpoint, 8> & target_a
}
}
// Request a list of the top known representatives
std::vector<nano::peer_information> nano::peer_container::representatives (size_t count_a)
{
std::vector<peer_information> result;
result.reserve (std::min (count_a, size_t (16)));
std::lock_guard<std::mutex> lock (mutex);
for (auto i (peers.get<6> ().begin ()), n (peers.get<6> ().end ()); i != n && result.size () < count_a; ++i)
{
if (!i->rep_weight.is_zero ())
{
result.push_back (*i);
}
}
return result;
}
void nano::peer_container::purge_syn_cookies (std::chrono::steady_clock::time_point const & cutoff)
{
std::lock_guard<std::mutex> lock (syn_cookie_mutex);
@ -297,21 +285,6 @@ std::vector<nano::peer_information> nano::peer_container::purge_list (std::chron
return result;
}
std::vector<nano::endpoint> nano::peer_container::rep_crawl ()
{
std::vector<nano::endpoint> result;
// If there is enough observed peers weight, crawl 10 peers. Otherwise - 40
uint16_t max_count = (total_weight () > online_weight_minimum) ? 10 : 40;
result.reserve (max_count);
std::lock_guard<std::mutex> lock (mutex);
uint16_t count (0);
for (auto i (peers.get<5> ().begin ()), n (peers.get<5> ().end ()); i != n && count < max_count; ++i, ++count)
{
result.push_back (i->endpoint);
};
return result;
}
size_t nano::peer_container::size ()
{
std::lock_guard<std::mutex> lock (mutex);
@ -323,36 +296,6 @@ size_t nano::peer_container::size_sqrt ()
return (static_cast<size_t> (std::ceil (std::sqrt (size ()))));
}
std::vector<nano::peer_information> nano::peer_container::list_probable_rep_weights ()
{
std::vector<nano::peer_information> result;
std::unordered_set<nano::account> probable_reps;
std::lock_guard<std::mutex> lock (mutex);
for (auto i (peers.get<6> ().begin ()), n (peers.get<6> ().end ()); i != n; ++i)
{
// Calculate if representative isn't recorded for several IP addresses
if (probable_reps.find (i->probable_rep_account) == probable_reps.end ())
{
if (!i->rep_weight.number ().is_zero ())
{
result.push_back (*i);
}
probable_reps.insert (i->probable_rep_account);
}
}
return result;
}
nano::uint128_t nano::peer_container::total_weight ()
{
nano::uint128_t result (0);
for (auto & entry : list_probable_rep_weights ())
{
result = result + entry.rep_weight.number ();
}
return result;
}
bool nano::peer_container::empty ()
{
return size () == 0;
@ -376,39 +319,6 @@ bool nano::peer_container::not_a_peer (nano::endpoint const & endpoint_a, bool a
return result;
}
bool nano::peer_container::rep_response (nano::endpoint const & endpoint_a, nano::account const & rep_account_a, nano::amount const & weight_a)
{
assert (endpoint_a.address ().is_v6 ());
auto updated (false);
std::lock_guard<std::mutex> lock (mutex);
auto existing (peers.find (endpoint_a));
if (existing != peers.end ())
{
peers.modify (existing, [weight_a, &updated, rep_account_a](nano::peer_information & info) {
info.last_rep_response = std::chrono::steady_clock::now ();
if (info.rep_weight < weight_a)
{
updated = true;
info.rep_weight = weight_a;
info.probable_rep_account = rep_account_a;
}
});
}
return updated;
}
void nano::peer_container::rep_request (nano::endpoint const & endpoint_a)
{
std::lock_guard<std::mutex> lock (mutex);
auto existing (peers.find (endpoint_a));
if (existing != peers.end ())
{
peers.modify (existing, [](nano::peer_information & info) {
info.last_rep_request = std::chrono::steady_clock::now ();
});
}
}
bool nano::peer_container::reachout (nano::endpoint const & endpoint_a, bool allow_local_peers)
{
// Don't contact invalid IPs

View file

@ -51,10 +51,6 @@ public:
std::chrono::steady_clock::time_point last_contact;
std::chrono::steady_clock::time_point last_attempt;
std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_rep_request{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_rep_response{ std::chrono::steady_clock::time_point () };
nano::amount rep_weight{ 0 };
nano::account probable_rep_account{ 0 };
unsigned network_version{ nano::protocol_version };
boost::optional<nano::account> node_id;
bool operator< (nano::peer_information const &) const;
@ -76,10 +72,8 @@ public:
bool insert (nano::endpoint const &, unsigned, bool = false, boost::optional<nano::account> = boost::none);
std::unordered_set<nano::endpoint> random_set (size_t);
void random_fill (std::array<nano::endpoint, 8> &);
// Request a list of the top known representatives
std::vector<peer_information> representatives (size_t);
// List of all peers
std::deque<nano::endpoint> list ();
std::deque<nano::endpoint> list (size_t count_a = std::numeric_limits<size_t>::max ());
std::vector<peer_information> list_vector (size_t);
// A list of random peers sized for the configured rebroadcast fanout
std::deque<nano::endpoint> list_fanout ();
@ -90,9 +84,6 @@ public:
// Purge any peer where last_contact < time_point and return what was left
std::vector<nano::peer_information> purge_list (std::chrono::steady_clock::time_point const &);
void purge_syn_cookies (std::chrono::steady_clock::time_point const &);
std::vector<nano::endpoint> rep_crawl ();
bool rep_response (nano::endpoint const &, nano::account const &, nano::amount const &);
void rep_request (nano::endpoint const &);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
// Returns boost::none if the IP is rate capped on syn cookie requests,
@ -103,8 +94,6 @@ public:
bool validate_syn_cookie (nano::endpoint const &, nano::account, nano::signature);
size_t size ();
size_t size_sqrt ();
nano::uint128_t total_weight ();
nano::uint128_t online_weight_minimum;
bool empty ();
std::mutex mutex;
nano::endpoint self;
@ -116,8 +105,6 @@ public:
boost::multi_index::ordered_non_unique<boost::multi_index::member<peer_information, std::chrono::steady_clock::time_point, &peer_information::last_attempt>, std::greater<std::chrono::steady_clock::time_point>>,
boost::multi_index::random_access<>,
boost::multi_index::ordered_non_unique<boost::multi_index::member<peer_information, std::chrono::steady_clock::time_point, &peer_information::last_bootstrap_attempt>>,
boost::multi_index::ordered_non_unique<boost::multi_index::member<peer_information, std::chrono::steady_clock::time_point, &peer_information::last_rep_request>>,
boost::multi_index::ordered_non_unique<boost::multi_index::member<peer_information, nano::amount, &peer_information::rep_weight>, std::greater<nano::amount>>,
boost::multi_index::ordered_non_unique<boost::multi_index::tag<peer_by_ip_addr>, boost::multi_index::member<peer_information, boost::asio::ip::address, &peer_information::ip_address>>>>
peers;
boost::multi_index_container<

240
nano/node/repcrawler.cpp Normal file
View file

@ -0,0 +1,240 @@
#include <nano/node/node.hpp>
#include <nano/node/repcrawler.hpp>
nano::rep_crawler::rep_crawler (nano::node & node_a) :
node (node_a)
{
node.observers.endpoint.add ([this](nano::endpoint const & endpoint_a) {
this->query (endpoint_a);
});
}
void nano::rep_crawler::add (nano::block_hash const & hash_a)
{
std::lock_guard<std::mutex> lock (active_mutex);
active.insert (hash_a);
}
void nano::rep_crawler::remove (nano::block_hash const & hash_a)
{
std::lock_guard<std::mutex> lock (active_mutex);
active.erase (hash_a);
}
bool nano::rep_crawler::exists (nano::block_hash const & hash_a)
{
std::lock_guard<std::mutex> lock (active_mutex);
return active.count (hash_a) != 0;
}
void nano::rep_crawler::start ()
{
ongoing_crawl ();
}
void nano::rep_crawler::ongoing_crawl ()
{
auto now (std::chrono::steady_clock::now ());
query (get_crawl_targets ());
if (node.network.on)
{
// Reduce crawl frequency when there's enough total peer weight
unsigned next_run_seconds = (total_weight_internal () > node.config.online_weight_minimum.number ()) ? 7 : 3;
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (now + std::chrono::seconds (next_run_seconds), [node_w, this]() {
if (auto node_l = node_w.lock ())
{
this->ongoing_crawl ();
}
});
}
}
std::vector<nano::endpoint> nano::rep_crawler::get_crawl_targets ()
{
std::unordered_set<nano::endpoint> endpoints;
constexpr size_t conservative_count = 10;
constexpr size_t aggressive_count = 40;
// Crawl more aggressively if we lack sufficient total peer weight.
bool sufficient_weight (total_weight_internal () > node.config.online_weight_minimum.number ());
uint16_t required_peer_count = sufficient_weight ? conservative_count : aggressive_count;
std::lock_guard<std::mutex> lock (probable_reps_mutex);
// First, add known rep endpoints, ordered by ascending last-requested time.
for (auto i (probable_reps.get<tag_last_request> ().begin ()), n (probable_reps.get<tag_last_request> ().end ()); i != n && endpoints.size () < required_peer_count; ++i)
{
endpoints.insert (i->endpoint);
};
// Add additional random peers. We do this even if we have enough weight, in order to pick up reps
// that didn't respond when first observed. If the current total weight isn't sufficient, this
// will be more aggressive. When the node first starts, the rep container is empty and all
// endpoints will originate from random peers.
required_peer_count += required_peer_count / 2;
// The rest of the endpoints are picked randomly
auto random_peers (node.peers.list ());
for (auto & peer : random_peers)
{
endpoints.insert (peer);
if (endpoints.size () >= required_peer_count)
{
break;
}
}
std::vector<nano::endpoint> result;
result.insert (result.end (), endpoints.begin (), endpoints.end ());
return result;
}
void nano::rep_crawler::query (std::vector<nano::endpoint> const & endpoints_a)
{
auto transaction (node.store.tx_begin_read ());
std::shared_ptr<nano::block> block (node.store.block_random (transaction));
auto hash (block->hash ());
add (hash);
for (auto i (endpoints_a.begin ()), n (endpoints_a.end ()); i != n; ++i)
{
on_rep_request (*i);
node.network.send_confirm_req (*i, block);
}
// A representative must respond with a vote within the deadline
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash]() {
if (auto node_l = node_w.lock ())
{
node_l->rep_crawler.remove (hash);
}
});
}
void nano::rep_crawler::query (nano::endpoint const & endpoint_a)
{
std::vector<nano::endpoint> peers;
peers.push_back (endpoint_a);
query (peers);
}
bool nano::rep_crawler::response (nano::endpoint const & endpoint_a, nano::account const & rep_account_a, nano::amount const & weight_a)
{
assert (endpoint_a.address ().is_v6 ());
auto updated (false);
std::lock_guard<std::mutex> lock (probable_reps_mutex);
auto existing (probable_reps.find (rep_account_a));
if (existing != probable_reps.end ())
{
probable_reps.modify (existing, [weight_a, &updated, rep_account_a, endpoint_a](nano::representative & info) {
info.last_response = std::chrono::steady_clock::now ();
if (info.weight < weight_a)
{
updated = true;
info.weight = weight_a;
info.endpoint = endpoint_a;
info.account = rep_account_a;
}
});
}
else
{
probable_reps.insert (nano::representative (rep_account_a, weight_a, endpoint_a));
}
return updated;
}
nano::uint128_t nano::rep_crawler::total_weight_internal ()
{
nano::uint128_t result (0);
for (auto i (probable_reps.get<tag_weight> ().begin ()), n (probable_reps.get<tag_weight> ().end ()); i != n; ++i)
{
auto weight (i->weight.number ());
if (weight > 0)
{
result = result + weight;
}
else
{
break;
}
}
return result;
}
nano::uint128_t nano::rep_crawler::total_weight ()
{
std::lock_guard<std::mutex> lock (probable_reps_mutex);
return total_weight_internal ();
}
std::vector<nano::representative> nano::rep_crawler::representatives_by_weight ()
{
std::vector<nano::representative> result;
std::lock_guard<std::mutex> lock (probable_reps_mutex);
for (auto i (probable_reps.get<tag_weight> ().begin ()), n (probable_reps.get<tag_weight> ().end ()); i != n; ++i)
{
auto weight (i->weight.number ());
if (weight > 0)
{
result.push_back (*i);
}
else
{
break;
}
}
return result;
}
void nano::rep_crawler::on_rep_request (nano::endpoint const & endpoint_a)
{
std::lock_guard<std::mutex> lock (probable_reps_mutex);
using probable_rep_itr_t = probably_rep_t::index<tag_endpoint>::type::iterator;
probably_rep_t::index<tag_endpoint>::type & endpoint_index = probable_reps.get<tag_endpoint> ();
// Find and update the timestamp on all reps available on the endpoint (a single host may have multiple reps)
std::vector<probable_rep_itr_t> view;
auto itr_pair = probable_reps.get<tag_endpoint> ().equal_range (endpoint_a);
for (; itr_pair.first != itr_pair.second; itr_pair.first++)
{
endpoint_index.modify (itr_pair.first, [](nano::representative & value_a) {
value_a.last_request = std::chrono::steady_clock::now ();
});
}
}
std::vector<nano::representative> nano::rep_crawler::representatives (size_t count_a)
{
std::vector<representative> result;
result.reserve (std::min (count_a, size_t (16)));
std::lock_guard<std::mutex> lock (probable_reps_mutex);
for (auto i (probable_reps.get<tag_weight> ().begin ()), n (probable_reps.get<tag_weight> ().end ()); i != n && result.size () < count_a; ++i)
{
if (!i->weight.is_zero ())
{
result.push_back (*i);
}
}
return result;
}
std::vector<nano::endpoint> nano::rep_crawler::representative_endpoints (size_t count_a)
{
std::vector<nano::endpoint> result;
auto reps (representatives (count_a));
for (auto rep : reps)
{
result.push_back (rep.endpoint);
}
return result;
}
/** Total number of representatives */
size_t nano::rep_crawler::representative_count ()
{
std::lock_guard<std::mutex> lock (probable_reps_mutex);
return probable_reps.size ();
}

134
nano/node/repcrawler.hpp Normal file
View file

@ -0,0 +1,134 @@
#pragma once
#include <boost/asio.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>
#include <chrono>
#include <memory>
#include <nano/node/common.hpp>
#include <unordered_map>
#include <unordered_set>
namespace mi = boost::multi_index;
namespace nano
{
class node;
/**
* A representative picked up during repcrawl.
*/
class representative
{
public:
representative (nano::account account_a, nano::amount weight_a, nano::endpoint endpoint_a) :
account (account_a), weight (weight_a), endpoint (endpoint_a)
{
}
nano::account account{ 0 };
nano::amount weight{ 0 };
nano::endpoint endpoint;
std::chrono::steady_clock::time_point last_request{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_response{ std::chrono::steady_clock::time_point () };
};
/**
* Crawls the network for representatives. Queries are performed by requesting confirmation of a
* random block and observing the corresponding vote.
*/
class rep_crawler
{
friend std::unique_ptr<seq_con_info_component> collect_seq_con_info (rep_crawler & rep_crawler, const std::string & name);
// clang-format off
class tag_endpoint {};
class tag_last_request {};
class tag_account {};
class tag_weight {};
using probably_rep_t = boost::multi_index_container<representative,
mi::indexed_by<
mi::hashed_unique<mi::member<representative, nano::account, &representative::account>>,
mi::random_access<>,
mi::ordered_non_unique<mi::tag<tag_last_request>,
mi::member<representative, std::chrono::steady_clock::time_point, &representative::last_request>>,
mi::ordered_non_unique<mi::tag<tag_weight>,
mi::member<representative, nano::amount, &representative::weight>, std::greater<nano::amount>>,
mi::ordered_non_unique<mi::tag<tag_endpoint>,
mi::member<representative, nano::endpoint, &representative::endpoint>>>>;
// clang-format on
public:
rep_crawler (nano::node & node_a);
/** Start crawling */
void start ();
/** Add block hash to list of active rep queries */
void add (nano::block_hash const &);
/** Remove block hash from list of active rep queries */
void remove (nano::block_hash const &);
/** Check if block hash is in the list of active rep queries */
bool exists (nano::block_hash const &);
/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::vector<nano::endpoint> const & endpoints_a);
/** Attempt to determine if the peer manages one or more representative accounts */
void query (nano::endpoint const & endpoint_a);
/**
* Called when a non-replay vote on a block previously sent by query() is received. This indiciates
* with high probability that the endpoint is a representative node.
* @return True if the rep entry was updated with new information due to increase in weight.
*/
bool response (nano::endpoint const & endpoint_a, nano::account const & rep_account_a, nano::amount const & weight_a);
/** Get total available weight from representatives */
nano::uint128_t total_weight ();
/** Request a list of the top \p count_a known representatives. The maximum number of reps returned is 16. */
std::vector<representative> representatives (size_t count_a);
/** Request a list of the top \p count_a known representative endpoints. The maximum number of reps returned is 16. */
std::vector<nano::endpoint> representative_endpoints (size_t count_a);
/** Returns all representatives registered with weight in descending order */
std::vector<nano::representative> representatives_by_weight ();
/** Total number of representatives */
size_t representative_count ();
private:
nano::node & node;
/** Protects the active-hash container */
std::mutex active_mutex;
/** We have solicted votes for these random blocks */
std::unordered_set<nano::block_hash> active;
/** Called continuously to crawl for representatives */
void ongoing_crawl ();
/** Returns a list of endpoints to crawl */
std::vector<nano::endpoint> get_crawl_targets ();
/** When a rep request is made, this is called to update the last-request timestamp. */
void on_rep_request (nano::endpoint const & endpoint_a);
/** Protects the probable_reps container */
std::mutex probable_reps_mutex;
/** Get total available weight from representatives (must be called with a lock on probable_reps_mutex) */
nano::uint128_t total_weight_internal ();
/** Probable representatives */
probably_rep_t probable_reps;
};
}

View file

@ -1688,16 +1688,16 @@ void nano::rpc_handler::confirmation_quorum ()
response_l.put ("online_weight_quorum_percent", std::to_string (node.config.online_weight_quorum));
response_l.put ("online_weight_minimum", node.config.online_weight_minimum.to_string_dec ());
response_l.put ("online_stake_total", node.online_reps.online_stake ().convert_to<std::string> ());
response_l.put ("peers_stake_total", node.peers.total_weight ().convert_to<std::string> ());
response_l.put ("peers_stake_total", node.rep_crawler.total_weight ().convert_to<std::string> ());
if (request.get<bool> ("peer_details", false))
{
boost::property_tree::ptree peers;
for (auto & peer : node.peers.list_probable_rep_weights ())
for (auto & peer : node.rep_crawler.representatives_by_weight ())
{
boost::property_tree::ptree peer_node;
peer_node.put ("account", peer.probable_rep_account.to_account ());
peer_node.put ("ip", peer.ip_address.to_string ());
peer_node.put ("weight", peer.rep_weight.to_string_dec ());
peer_node.put ("account", peer.account.to_account ());
peer_node.put ("ip", peer.endpoint.address ().to_string ());
peer_node.put ("weight", peer.weight.to_string_dec ());
peers.push_back (std::make_pair ("", peer_node));
}
response_l.add_child ("peers", peers);