From 027741465802fe36fa932337137ff35bbe00fc01 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Sun, 19 Jan 2020 20:30:43 -0600 Subject: [PATCH] Rep crawler cleanup (#2482) Adding references to some parameter specifications, using emplace_back, and fixing test to check for deadline errors. --- nano/core_test/node.cpp | 151 ++++++++++++++++++++++++++++++++------- nano/node/node.cpp | 35 +-------- nano/node/repcrawler.cpp | 120 +++++++++++++++++++++---------- nano/node/repcrawler.hpp | 14 ++-- 4 files changed, 214 insertions(+), 106 deletions(-) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index b9c4a846..f25a524c 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1617,8 +1617,9 @@ TEST (node, DISABLED_fork_stale) auto & node1 (*system1.nodes[0]); auto & node2 (*system2.nodes[0]); node2.bootstrap_initiator.bootstrap (node1.network.endpoint ()); - auto channel (std::make_shared (node2.network.udp_channels, node1.network.endpoint (), node2.network_params.protocol.protocol_version)); - node2.rep_crawler.response (channel, nano::test_genesis_key.pub, nano::genesis_amount); + std::shared_ptr channel (std::make_shared (node2.network.udp_channels, node1.network.endpoint (), node2.network_params.protocol.protocol_version)); + auto vote = std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, std::vector ()); + node2.rep_crawler.response (channel, vote); nano::genesis genesis; nano::keypair key1; nano::keypair key2; @@ -1993,30 +1994,59 @@ TEST (node, rep_weight) { nano::system system (1); auto & node (*system.nodes[0]); - + nano::genesis genesis; + nano::keypair keypair1; + nano::block_builder builder; + std::shared_ptr block1 = builder + .state () + .account (nano::test_genesis_key.pub) + .previous (genesis.hash ()) + .representative (nano::test_genesis_key.pub) + .balance (nano::genesis_amount - node.minimum_principal_weight () * 4) + .link (keypair1.pub) + .sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub) + .work (*system.work.generate (genesis.hash ())) + .build (); + std::shared_ptr block2 = builder + .state () + .account (keypair1.pub) + .previous (0) + .representative (keypair1.pub) + .balance (node.minimum_principal_weight () * 4) + .link (block1->hash ()) + .sign (keypair1.prv, keypair1.pub) + .work (*system.work.generate (keypair1.pub)) + .build (); + { + auto transaction = node.store.tx_begin_write (); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block1).code); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block2).code); + } node.network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), nano::get_available_port ()), 0); ASSERT_TRUE (node.rep_crawler.representatives (1).empty ()); nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), nano::get_available_port ()); nano::endpoint endpoint1 (boost::asio::ip::address_v6::loopback (), nano::get_available_port ()); nano::endpoint endpoint2 (boost::asio::ip::address_v6::loopback (), nano::get_available_port ()); - auto channel0 (std::make_shared (node.network.udp_channels, endpoint0, node.network_params.protocol.protocol_version)); - auto channel1 (std::make_shared (node.network.udp_channels, endpoint1, node.network_params.protocol.protocol_version)); - auto channel2 (std::make_shared (node.network.udp_channels, endpoint2, node.network_params.protocol.protocol_version)); - nano::amount amount100 (100); - nano::amount amount50 (50); + std::shared_ptr channel0 (std::make_shared (node.network.udp_channels, endpoint0, node.network_params.protocol.protocol_version)); + std::shared_ptr channel1 (std::make_shared (node.network.udp_channels, endpoint1, node.network_params.protocol.protocol_version)); + std::shared_ptr channel2 (std::make_shared (node.network.udp_channels, endpoint2, node.network_params.protocol.protocol_version)); node.network.udp_channels.insert (endpoint2, node.network_params.protocol.protocol_version); node.network.udp_channels.insert (endpoint0, node.network_params.protocol.protocol_version); node.network.udp_channels.insert (endpoint1, node.network_params.protocol.protocol_version); - nano::keypair keypair1; - nano::keypair keypair2; - node.rep_crawler.response (channel0, keypair1.pub, amount100); - node.rep_crawler.response (channel1, keypair2.pub, amount50); - ASSERT_EQ (2, node.rep_crawler.representative_count ()); + auto vote1 = std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open); + auto vote2 = std::make_shared (keypair1.pub, keypair1.prv, 0, genesis.open); + node.rep_crawler.add (genesis.open->hash ()); + node.rep_crawler.response (channel0, vote1); + node.rep_crawler.response (channel1, vote2); + while (node.rep_crawler.representative_count () != 2) + { + ASSERT_NO_ERROR (system.poll ()); + } // Make sure we get the rep with the most weight first auto reps (node.rep_crawler.representatives (1)); ASSERT_EQ (1, reps.size ()); - ASSERT_EQ (100, reps[0].weight.number ()); - ASSERT_EQ (keypair1.pub, reps[0].account); + ASSERT_EQ (nano::genesis_amount - node.minimum_principal_weight () * 4, reps[0].weight.number ()); + ASSERT_EQ (nano::test_genesis_key.pub, reps[0].account); ASSERT_EQ (*channel0, reps[0].channel_ref ()); } @@ -2024,40 +2054,107 @@ TEST (node, rep_remove) { nano::system system (1); auto & node (*system.nodes[0]); + nano::genesis genesis; + nano::keypair keypair1; + nano::keypair keypair2; + nano::block_builder builder; + std::shared_ptr block1 = builder + .state () + .account (nano::test_genesis_key.pub) + .previous (genesis.hash ()) + .representative (nano::test_genesis_key.pub) + .balance (nano::genesis_amount - node.minimum_principal_weight () * 2) + .link (keypair1.pub) + .sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub) + .work (*system.work.generate (genesis.hash ())) + .build (); + std::shared_ptr block2 = builder + .state () + .account (keypair1.pub) + .previous (0) + .representative (keypair1.pub) + .balance (node.minimum_principal_weight () * 2) + .link (block1->hash ()) + .sign (keypair1.prv, keypair1.pub) + .work (*system.work.generate (keypair1.pub)) + .build (); + std::shared_ptr block3 = builder + .state () + .account (nano::test_genesis_key.pub) + .previous (block1->hash ()) + .representative (nano::test_genesis_key.pub) + .balance (nano::genesis_amount - node.minimum_principal_weight () * 4) + .link (keypair2.pub) + .sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub) + .work (*system.work.generate (block1->hash ())) + .build (); + std::shared_ptr block4 = builder + .state () + .account (keypair2.pub) + .previous (0) + .representative (keypair2.pub) + .balance (node.minimum_principal_weight () * 2) + .link (block3->hash ()) + .sign (keypair2.prv, keypair2.pub) + .work (*system.work.generate (keypair2.pub)) + .build (); + { + auto transaction = node.store.tx_begin_write (); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block1).code); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block2).code); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block3).code); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block4).code); + } // Add inactive UDP representative channel nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), nano::get_available_port ()); - auto channel0 (std::make_shared (node.network.udp_channels, endpoint0, node.network_params.protocol.protocol_version)); + std::shared_ptr channel0 (std::make_shared (node.network.udp_channels, endpoint0, node.network_params.protocol.protocol_version)); nano::amount amount100 (100); node.network.udp_channels.insert (endpoint0, node.network_params.protocol.protocol_version); - nano::keypair keypair1; - node.rep_crawler.response (channel0, keypair1.pub, amount100); - ASSERT_EQ (1, node.rep_crawler.representative_count ()); + auto vote1 = std::make_shared (keypair1.pub, keypair1.prv, 0, genesis.open); + node.rep_crawler.add (genesis.hash ()); + node.rep_crawler.response (channel0, vote1); + system.deadline_set (5s); + while (node.rep_crawler.representative_count () != 1) + { + ASSERT_NO_ERROR (system.poll ()); + } auto reps (node.rep_crawler.representatives (1)); ASSERT_EQ (1, reps.size ()); - ASSERT_EQ (100, reps[0].weight.number ()); + ASSERT_EQ (node.minimum_principal_weight () * 2, reps[0].weight.number ()); ASSERT_EQ (keypair1.pub, reps[0].account); ASSERT_EQ (*channel0, reps[0].channel_ref ()); + // This UDP channel is not reachable and should timeout + while (node.rep_crawler.representative_count () != 0) + { + ASSERT_NO_ERROR (system.poll ()); + } // Add working representative auto node1 = system.add_node (nano::node_config (nano::get_available_port (), system.logging)); system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); auto channel1 (node.network.find_channel (node1->network.endpoint ())); ASSERT_NE (nullptr, channel1); - node.rep_crawler.response (channel1, nano::test_genesis_key.pub, nano::genesis_amount); - ASSERT_EQ (2, node.rep_crawler.representative_count ()); + auto vote2 = std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open); + node.rep_crawler.response (channel1, vote2); + while (node.rep_crawler.representative_count () != 1) + { + ASSERT_NO_ERROR (system.poll ()); + } // Add inactive TCP representative channel auto node2 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, nano::node_config (nano::get_available_port (), system.logging), system.work)); std::atomic done{ false }; std::weak_ptr node_w (node.shared ()); - node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &done](std::shared_ptr channel2) { + auto vote3 = std::make_shared (keypair2.pub, keypair2.prv, 0, genesis.open); + node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &done, &vote3, &system](std::shared_ptr channel2) { if (auto node_l = node_w.lock ()) { - nano::keypair keypair2; - node_l->rep_crawler.response (channel2, keypair2.pub, nano::Gxrb_ratio); - ASSERT_EQ (3, node_l->rep_crawler.representative_count ()); + node_l->rep_crawler.response (channel2, vote3); + while (node_l->rep_crawler.representative_count () != 2) + { + ASSERT_NO_ERROR (system.poll ()); + } done = true; } }); - system.deadline_set (10s); while (!done) { ASSERT_NO_ERROR (system.poll ()); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 693dd8a1..202ba198 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -312,40 +312,7 @@ startup_time (std::chrono::steady_clock::now ()) { this->gap_cache.vote (vote_a); this->online_reps.observe (vote_a->account); - nano::uint128_t rep_weight; - { - rep_weight = ledger.weight (vote_a->account); - } - if (rep_weight > minimum_principal_weight ()) - { - bool rep_crawler_exists (false); - for (auto hash : *vote_a) - { - if (this->rep_crawler.exists (hash)) - { - rep_crawler_exists = true; - break; - } - } - if (rep_crawler_exists) - { - // We see a valid non-replay vote for a block we requested, this node is probably a representative - if (this->rep_crawler.response (channel_a, vote_a->account, rep_weight)) - { - logger.try_log (boost::str (boost::format ("Found a representative at %1%") % channel_a->to_string ())); - // Rebroadcasting all active votes to new representative - auto blocks (this->active.list_blocks ()); - for (auto i (blocks.begin ()), n (blocks.end ()); i != n; ++i) - { - if (*i != nullptr) - { - nano::confirm_req req (*i); - channel_a->send (req); - } - } - } - } - } + this->rep_crawler.response (channel_a, vote_a); } }); if (websocket_server) diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index 7ad42757..076dc815 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -1,6 +1,8 @@ #include #include +#include + nano::rep_crawler::rep_crawler (nano::node & node_a) : node (node_a) { @@ -24,23 +26,75 @@ void nano::rep_crawler::remove (nano::block_hash const & hash_a) active.erase (hash_a); } -bool nano::rep_crawler::exists (nano::block_hash const & hash_a) -{ - nano::lock_guard lock (active_mutex); - return active.count (hash_a) != 0; -} - void nano::rep_crawler::start () { ongoing_crawl (); } +void nano::rep_crawler::validate () +{ + decltype (responses) responses_l; + { + nano::lock_guard lock (active_mutex); + responses_l.swap (responses); + } + auto minimum = node.minimum_principal_weight (); + for (auto const & i : responses_l) + { + auto & vote = i.second; + auto & channel = i.first; + nano::uint128_t rep_weight = node.ledger.weight (vote->account); + if (rep_weight > minimum) + { + auto updated_or_inserted = false; + nano::unique_lock lock (probable_reps_mutex); + auto existing (probable_reps.find (vote->account)); + if (existing != probable_reps.end ()) + { + probable_reps.modify (existing, [rep_weight, &updated_or_inserted, &vote, &channel](nano::representative & info) { + info.last_response = std::chrono::steady_clock::now (); + + // Update if representative channel was changed + if (info.channel->get_endpoint () != channel->get_endpoint ()) + { + assert (info.account == vote->account); + updated_or_inserted = true; + info.weight = rep_weight; + info.channel = channel; + } + }); + } + else + { + probable_reps.emplace (nano::representative (vote->account, rep_weight, channel)); + updated_or_inserted = true; + } + lock.unlock (); + if (updated_or_inserted) + { + node.logger.try_log (boost::str (boost::format ("Found a representative at %1%") % channel->to_string ())); + // Rebroadcasting all active votes to new representative + auto blocks (node.active.list_blocks ()); + for (auto i (blocks.begin ()), n (blocks.end ()); i != n; ++i) + { + if (*i != nullptr) + { + nano::confirm_req req (*i); + channel->send (req); + } + } + } + } + } +} + void nano::rep_crawler::ongoing_crawl () { auto now (std::chrono::steady_clock::now ()); auto total_weight_l (total_weight ()); cleanup_reps (); update_weights (); + validate (); query (get_crawl_targets (total_weight_l)); auto sufficient_weight (total_weight_l > node.config.online_weight_minimum.number ()); // If online weight drops below minimum, reach out to preconfigured peers @@ -49,9 +103,9 @@ void nano::rep_crawler::ongoing_crawl () node.keepalive_preconfigured (node.config.preconfigured_peers); } // Reduce crawl frequency when there's enough total peer weight - unsigned next_run_seconds = sufficient_weight ? 7 : 3; + unsigned next_run_ms = node.network_params.network.is_test_network () ? 100 : sufficient_weight ? 7000 : 3000; std::weak_ptr node_w (node.shared ()); - node.alarm.add (now + std::chrono::seconds (next_run_seconds), [node_w, this]() { + node.alarm.add (now + std::chrono::milliseconds (next_run_ms), [node_w, this]() { if (auto node_l = node_w.lock ()) { this->ongoing_crawl (); @@ -86,16 +140,19 @@ void nano::rep_crawler::query (std::vector block (node.store.block_random (transaction)); auto hash (block->hash ()); - // Don't send same block multiple times in tests - if (node.network_params.network.is_test_network ()) { - for (auto i (0); exists (hash) && i < 4; ++i) + nano::lock_guard lock (active_mutex); + // Don't send same block multiple times in tests + if (node.network_params.network.is_test_network ()) { - block = node.store.block_random (transaction); - hash = block->hash (); + for (auto i (0); active.count (hash) != 0 && i < 4; ++i) + { + block = node.store.block_random (transaction); + hash = block->hash (); + } } + active.insert (hash); } - add (hash); for (auto i (channels_a.begin ()), n (channels_a.end ()); i != n; ++i) { on_rep_request (*i); @@ -112,39 +169,24 @@ void nano::rep_crawler::query (std::vector channel_a) +void nano::rep_crawler::query (std::shared_ptr & channel_a) { std::vector> peers; - peers.push_back (channel_a); + peers.emplace_back (channel_a); query (peers); } -bool nano::rep_crawler::response (std::shared_ptr channel_a, nano::account const & rep_account_a, nano::amount const & weight_a) +void nano::rep_crawler::response (std::shared_ptr & channel_a, std::shared_ptr & vote_a) { - auto updated_or_inserted (false); - nano::lock_guard lock (probable_reps_mutex); - auto existing (probable_reps.find (rep_account_a)); - if (existing != probable_reps.end ()) + nano::lock_guard lock (active_mutex); + for (auto i = vote_a->begin (), n = vote_a->end (); i != n; ++i) { - probable_reps.modify (existing, [weight_a, &updated_or_inserted, rep_account_a, channel_a](nano::representative & info) { - info.last_response = std::chrono::steady_clock::now (); - - // Update if representative channel was changed - if (info.channel->get_endpoint () != channel_a->get_endpoint ()) - { - assert (info.account == rep_account_a); - updated_or_inserted = true; - info.weight = weight_a; - info.channel = channel_a; - } - }); + if (active.count (*i) != 0) + { + responses.push_back (std::make_pair (channel_a, vote_a)); + break; + } } - else - { - probable_reps.emplace (nano::representative (rep_account_a, weight_a, channel_a)); - updated_or_inserted = true; - } - return updated_or_inserted; } nano::uint128_t nano::rep_crawler::total_weight () const diff --git a/nano/node/repcrawler.hpp b/nano/node/repcrawler.hpp index f1c2fd1c..b0a59c79 100644 --- a/nano/node/repcrawler.hpp +++ b/nano/node/repcrawler.hpp @@ -51,7 +51,7 @@ public: * Crawls the network for representatives. Queries are performed by requesting confirmation of a * random block and observing the corresponding vote. */ -class rep_crawler +class rep_crawler final { friend std::unique_ptr collect_container_info (rep_crawler & rep_crawler, const std::string & name); @@ -85,21 +85,18 @@ public: /** Remove block hash from list of active rep queries */ void remove (nano::block_hash const &); - /** Check if block hash is in the list of active rep queries */ - bool exists (nano::block_hash const &); - /** Attempt to determine if the peer manages one or more representative accounts */ void query (std::vector> const & channels_a); /** Attempt to determine if the peer manages one or more representative accounts */ - void query (std::shared_ptr channel_a); + void query (std::shared_ptr & channel_a); /** * Called when a non-replay vote on a block previously sent by query() is received. This indiciates * with high probability that the endpoint is a representative node. * @return True if the rep entry was updated with new information due to increase in weight. */ - bool response (std::shared_ptr channel_a, nano::account const & rep_account_a, nano::amount const & weight_a); + void response (std::shared_ptr &, std::shared_ptr &); /** Get total available weight from representatives */ nano::uint128_t total_weight () const; @@ -122,6 +119,9 @@ private: /** We have solicted votes for these random blocks */ std::unordered_set active; + // Validate responses to see if they're reps + void validate (); + /** Called continuously to crawl for representatives */ void ongoing_crawl (); @@ -142,5 +142,7 @@ private: /** Probable representatives */ probably_rep_t probable_reps; + + std::deque, std::shared_ptr>> responses; }; }