From 7f7981c8e64aec9cf1a293d0758e7feb78a7de21 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 15 Sep 2020 14:16:53 +0100 Subject: [PATCH] Move all vote generation to the voting thread (#2865) * Move all vote generation to the voting thread Following https://github.com/nanocurrency/nano-node/pull/2827 , this PR makes additional enhancements and consistency checks to the request aggregator, vote generator and vote history cache. Changes to which threads vote: - Request aggregator hashes that have no cached votes are now queued to the vote generator instead. The channel is passed along so that reply can follow. These hashes are queued in bulk, and the number of "bulks" is limited to 2048, erasing the oldest values if going beyond that. Candidates queued for broadcasting always have priority. - All vote generation is now done from the voting thread. This is enforced with an assert (unless in the test network) inside the vote generation method. By reducing the number of interfaces we reduce the probability of a bug related to vote generation. Checks to grab votes from cache in more situations: - Added an early cache check when queueing vote candidates from elections. - Added late cache checks in the vote generation loop, to prevent re-generating votes when they were queued both for broadcasting (a new election) and/or multiple requests. Given the resource usage of vote generation, this might not be that uncommon. Added new stats to track in beta testing. Local votes cache consistency enhancements: - Votes cached are now enforced to be unique by account. Since we *now* have a single thread adding to the cache, along with the late cache checks, this should not happen, but the cost to enforce is negligible. - Expanded the vote cache consistency checks performed in debug builds. Others: - Added an early return to `stats:add` if the value is 0, so the caller doesn't have to make that check. Note the minimal amount of changes to existing tests, which should give some confidence to this approach. * Slow down bootstrap confirm_frontiers in tests This fixes intermittent failures in bootstrap_processor.frontiers_confirmed with TSAN. Threads have shifted which can cause a timing change especially with a sanitizer. * Lower requeued_pulls_limit_dev to 1 This is unrelated but makes frontiers confirmation related tests twice as fast * Making consistency_check const and only compile on debug * Reduce scope of read transaction in vote_generator::generate (Wes review) * Save a copy by moving pop_front to end of the loop (Wez review) * Use structured bindings for readibility * Redundant condition (Wes review) * Compile consistency_check in NDEBUG as well to avoid preprocessor magic (Colin review) * Spelling * Fix stat requests_generated_votes increasing for late cached replies This was causing an intermittent failure in request_aggregator.two_endpoints --- nano/core_test/node.cpp | 2 + nano/core_test/request_aggregator.cpp | 8 +- nano/core_test/voting.cpp | 29 +-- nano/lib/stats.cpp | 18 ++ nano/lib/stats.hpp | 15 +- nano/node/active_transactions.cpp | 2 +- nano/node/bootstrap/bootstrap.hpp | 2 +- nano/node/bootstrap/bootstrap_attempt.cpp | 2 +- nano/node/node.cpp | 2 +- nano/node/nodeconfig.hpp | 2 +- nano/node/request_aggregator.cpp | 77 +++---- nano/node/request_aggregator.hpp | 10 +- nano/node/voting.cpp | 244 +++++++++++++++++----- nano/node/voting.hpp | 32 ++- nano/secure/blockstore_partial.hpp | 4 + 15 files changed, 314 insertions(+), 135 deletions(-) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index e2fe85155..da4d0adf3 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2483,6 +2483,8 @@ TEST (node, stat_counting) ASSERT_EQ (10, node1.stats.count (nano::stat::type::ledger, nano::stat::dir::in)); ASSERT_EQ (2, node1.stats.count (nano::stat::type::ledger, nano::stat::detail::send, nano::stat::dir::in)); ASSERT_EQ (1, node1.stats.count (nano::stat::type::ledger, nano::stat::detail::receive, nano::stat::dir::in)); + node1.stats.add (nano::stat::type::ledger, nano::stat::dir::in, 0); + ASSERT_EQ (10, node1.stats.count (nano::stat::type::ledger, nano::stat::dir::in)); } TEST (node, online_reps) diff --git a/nano/core_test/request_aggregator.cpp b/nano/core_test/request_aggregator.cpp index 3a5551cd3..4025ea7ee 100644 --- a/nano/core_test/request_aggregator.cpp +++ b/nano/core_test/request_aggregator.cpp @@ -129,7 +129,7 @@ TEST (request_aggregator, two_endpoints) auto & node2 (*system.add_node (node_config, node_flags)); nano::genesis genesis; system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv); - auto send1 (std::make_shared (nano::dev_genesis_key.pub, genesis.hash (), nano::dev_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *node1.work_generate_blocking (genesis.hash ()))); + auto send1 (std::make_shared (nano::dev_genesis_key.pub, genesis.hash (), nano::dev_genesis_key.pub, nano::genesis_amount - 1, nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *node1.work_generate_blocking (genesis.hash ()))); std::vector> request; request.emplace_back (send1->hash (), send1->root ()); ASSERT_EQ (nano::process_result::progress, node1.ledger.process (node1.store.tx_begin_write (), *send1).code); @@ -147,8 +147,8 @@ TEST (request_aggregator, two_endpoints) ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); - ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); - ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); + ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes) + node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_late_hashes)); + ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes) + node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_late_votes)); ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote)); } @@ -380,6 +380,6 @@ TEST (request_aggregator, cannot_vote) ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); - ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); + ASSERT_TIMELY (3s, 1 <= node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } } diff --git a/nano/core_test/voting.cpp b/nano/core_test/voting.cpp index 326671c03..b7a9f0dca 100644 --- a/nano/core_test/voting.cpp +++ b/nano/core_test/voting.cpp @@ -16,29 +16,37 @@ TEST (local_vote_history, basic) ASSERT_FALSE (history.exists (2)); ASSERT_TRUE (history.votes (1).empty ()); ASSERT_TRUE (history.votes (2).empty ()); - auto vote1 (std::make_shared ()); + auto vote1a (std::make_shared ()); ASSERT_EQ (0, history.size ()); - history.add (1, 2, vote1); + history.add (1, 2, vote1a); ASSERT_EQ (1, history.size ()); ASSERT_TRUE (history.exists (1)); ASSERT_FALSE (history.exists (2)); - auto votes1 (history.votes (1)); - ASSERT_FALSE (votes1.empty ()); + auto votes1a (history.votes (1)); + ASSERT_FALSE (votes1a.empty ()); ASSERT_EQ (1, history.votes (1, 2).size ()); ASSERT_TRUE (history.votes (1, 1).empty ()); ASSERT_TRUE (history.votes (1, 3).empty ()); ASSERT_TRUE (history.votes (2).empty ()); - ASSERT_EQ (1, votes1.size ()); - ASSERT_EQ (vote1, votes1[0]); + ASSERT_EQ (1, votes1a.size ()); + ASSERT_EQ (vote1a, votes1a[0]); + auto vote1b (std::make_shared ()); + history.add (1, 2, vote1b); + auto votes1b (history.votes (1)); + ASSERT_EQ (1, votes1b.size ()); + ASSERT_EQ (vote1b, votes1b[0]); + ASSERT_NE (vote1a, votes1b[0]); auto vote2 (std::make_shared ()); + vote2->account.dwords[0]++; ASSERT_EQ (1, history.size ()); history.add (1, 2, vote2); ASSERT_EQ (2, history.size ()); auto votes2 (history.votes (1)); ASSERT_EQ (2, votes2.size ()); - ASSERT_TRUE (vote1 == votes2[0] || vote1 == votes2[1]); + ASSERT_TRUE (vote1b == votes2[0] || vote1b == votes2[1]); ASSERT_TRUE (vote2 == votes2[0] || vote2 == votes2[1]); auto vote3 (std::make_shared ()); + vote3->account.dwords[1]++; history.add (1, 3, vote3); ASSERT_EQ (1, history.size ()); auto votes3 (history.votes (1)); @@ -103,13 +111,10 @@ TEST (vote_generator, session) nano::vote_generator_session generator_session (node->active.generator); boost::thread thread ([node, &generator_session]() { nano::thread_role::set (nano::thread_role::name::request_loop); - for (unsigned i = 0; i < 100; ++i) - { - generator_session.add (nano::genesis_account, nano::genesis_hash); - } + generator_session.add (nano::genesis_account, nano::genesis_hash); ASSERT_EQ (0, node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate)); generator_session.flush (); }); thread.join (); - ASSERT_TIMELY (5s, node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate) == (100 / nano::network::confirm_ack_hashes_max)); + ASSERT_TIMELY (2s, 1 == node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate)); } diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index dc015b8c8..2208c5e4b 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -450,6 +450,9 @@ std::string nano::stat::type_to_string (uint32_t key) case nano::stat::type::telemetry: res = "telemetry"; break; + case nano::stat::type::vote_generator: + res = "vote_generator"; + break; } return res; } @@ -712,6 +715,12 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::requests_generated_votes: res = "requests_generated_votes"; break; + case nano::stat::detail::requests_cached_late_hashes: + res = "requests_cached_late_hashes"; + break; + case nano::stat::detail::requests_cached_late_votes: + res = "requests_cached_late_votes"; + break; case nano::stat::detail::requests_cannot_vote: res = "requests_cannot_vote"; break; @@ -742,6 +751,15 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::failed_send_telemetry_req: res = "failed_send_telemetry_req"; break; + case nano::stat::detail::generator_broadcasts: + res = "generator_broadcasts"; + break; + case nano::stat::detail::generator_replies: + res = "generator_replies"; + break; + case nano::stat::detail::generator_replies_discarded: + res = "generator_replies_discarded"; + break; } return res; } diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index d36b1ecfc..6ed0aa820 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -202,6 +202,7 @@ public: requests, filter, telemetry, + vote_generator }; /** Optional detail type */ @@ -321,6 +322,8 @@ public: requests_generated_hashes, requests_cached_votes, requests_generated_votes, + requests_cached_late_hashes, + requests_cached_late_votes, requests_cannot_vote, requests_unknown, @@ -334,7 +337,12 @@ public: request_within_protection_cache_zone, no_response_received, unsolicited_telemetry_ack, - failed_send_telemetry_req + failed_send_telemetry_req, + + // vote generator + generator_broadcasts, + generator_replies, + generator_replies_discarded }; /** Direction of the stat. If the direction is irrelevant, use in */ @@ -407,6 +415,11 @@ public: */ void add (stat::type type, stat::detail detail, stat::dir dir, uint64_t value, bool detail_only = false) { + if (value == 0) + { + return; + } + constexpr uint32_t no_detail_mask = 0xffff00ff; uint32_t key = key_of (type, detail, dir); diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 0c106d270..c141cc74b 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -24,7 +24,7 @@ confirmation_height_processor (confirmation_height_processor_a), node (node_a), multipliers_cb (20, 1.), trended_active_multiplier (1.0), -generator (node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network), +generator (node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network, node_a.stats), check_all_elections_period (node_a.network_params.network.is_dev_network () ? 10ms : 5s), election_time_to_live (node_a.network_params.network.is_dev_network () ? 0s : 2s), prioritized_cutoff (std::max (1, node_a.config.active_elections_size / 10)), diff --git a/nano/node/bootstrap/bootstrap.hpp b/nano/node/bootstrap/bootstrap.hpp index ff2a80d7e..c5afd41e7 100644 --- a/nano/node/bootstrap/bootstrap.hpp +++ b/nano/node/bootstrap/bootstrap.hpp @@ -131,7 +131,7 @@ public: static constexpr double required_frontier_confirmation_ratio = 0.8; static constexpr unsigned frontier_confirmation_blocks_limit = 128 * 1024; static constexpr unsigned requeued_pulls_limit = 256; - static constexpr unsigned requeued_pulls_limit_dev = 2; + static constexpr unsigned requeued_pulls_limit_dev = 1; static constexpr unsigned requeued_pulls_processed_blocks_factor = 4096; static constexpr unsigned bulk_push_cost_limit = 200; static constexpr std::chrono::seconds lazy_flush_delay_sec = std::chrono::seconds (5); diff --git a/nano/node/bootstrap/bootstrap_attempt.cpp b/nano/node/bootstrap/bootstrap_attempt.cpp index 78f858f14..eb9bfac3a 100644 --- a/nano/node/bootstrap/bootstrap_attempt.cpp +++ b/nano/node/bootstrap/bootstrap_attempt.cpp @@ -485,7 +485,7 @@ bool nano::bootstrap_attempt_legacy::confirm_frontiers (nano::unique_locknetwork.broadcast_confirm_req_batched_many (batched_confirm_req_bundle); - std::this_thread::sleep_for (std::chrono::milliseconds (!node->network_params.network.is_dev_network () ? 500 : 5)); + std::this_thread::sleep_for (std::chrono::milliseconds (!node->network_params.network.is_dev_network () ? 500 : 100)); } } if (!confirmed) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 047d3464a..7f98ad7f0 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -125,7 +125,7 @@ online_reps (ledger, network_params, config.online_weight_minimum.number ()), vote_uniquer (block_uniquer), confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger, node_initialized_latch, flags.confirmation_height_processor_mode), active (*this, confirmation_height_processor), -aggregator (network_params.network, config, stats, history, ledger, wallets, active), +aggregator (network_params.network, config, stats, active.generator, history, ledger, wallets, active), payment_observer_processor (observers.blocks), wallets (wallets_store.init_error (), *this), startup_time (std::chrono::steady_clock::now ()), diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 627f28f49..4683af79a 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -126,7 +126,7 @@ public: bool disable_bootstrap_bulk_pull_server{ false }; bool disable_bootstrap_bulk_push_client{ false }; bool disable_rep_crawler{ false }; - bool disable_request_loop{ false }; + bool disable_request_loop{ false }; // For testing only bool disable_tcp_realtime{ false }; bool disable_udp{ true }; bool disable_unchecked_cleanup{ false }; diff --git a/nano/node/request_aggregator.cpp b/nano/node/request_aggregator.cpp index 28af292b9..d825117b2 100644 --- a/nano/node/request_aggregator.cpp +++ b/nano/node/request_aggregator.cpp @@ -11,7 +11,7 @@ #include #include -nano::request_aggregator::request_aggregator (nano::network_constants const & network_constants_a, nano::node_config const & config_a, nano::stat & stats_a, nano::local_vote_history & history_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::active_transactions & active_a) : +nano::request_aggregator::request_aggregator (nano::network_constants const & network_constants_a, nano::node_config const & config_a, nano::stat & stats_a, nano::vote_generator & generator_a, nano::local_vote_history & history_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::active_transactions & active_a) : max_delay (network_constants_a.is_dev_network () ? 50 : 300), small_delay (network_constants_a.is_dev_network () ? 10 : 50), max_channel_requests (config_a.max_queued_requests), @@ -20,8 +20,12 @@ local_votes (history_a), ledger (ledger_a), wallets (wallets_a), active (active_a), +generator (generator_a), thread ([this]() { run (); }) { + generator.set_reply_action ([this](std::shared_ptr const & vote_a, std::shared_ptr & channel_a) { + this->reply_action (vote_a, channel_a); + }); nano::unique_lock lock (mutex); condition.wait (lock, [& started = started] { return started; }); } @@ -88,12 +92,12 @@ void nano::request_aggregator::run () requests_by_deadline.erase (front); lock.unlock (); erase_duplicates (hashes_roots); - auto transaction (ledger.store.tx_begin_read ()); - auto remaining = aggregate (transaction, hashes_roots, channel); + auto const remaining = aggregate (hashes_roots, channel); if (!remaining.empty ()) { // Generate votes for the remaining hashes - generate (transaction, remaining, channel); + auto const generated = generator.generate (remaining, channel); + stats.add (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote, stat::dir::in, remaining.size () - generated); } lock.lock (); } @@ -134,6 +138,12 @@ bool nano::request_aggregator::empty () return size () == 0; } +void nano::request_aggregator::reply_action (std::shared_ptr const & vote_a, std::shared_ptr & channel_a) const +{ + nano::confirm_ack confirm (vote_a); + channel_a->send (confirm); +} + void nano::request_aggregator::erase_duplicates (std::vector> & requests_a) const { std::sort (requests_a.begin (), requests_a.end (), [](auto const & pair1, auto const & pair2) { @@ -145,10 +155,11 @@ void nano::request_aggregator::erase_duplicates (std::vector> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, std::vector> const & requests_a, std::shared_ptr & channel_a) const +std::vector> nano::request_aggregator::aggregate (std::vector> const & requests_a, std::shared_ptr & channel_a) const { + auto transaction (ledger.store.tx_begin_read ()); size_t cached_hashes = 0; - std::vector> to_generate; + std::vector> to_generate; std::vector> cached_votes; for (auto const & hash_root : requests_a) { @@ -167,19 +178,20 @@ std::vector> nano::request_aggregator::a // 3. Ledger by hash if (block == nullptr) { - block = ledger.store.block_get (transaction_a, hash_root.first); + block = ledger.store.block_get (transaction, hash_root.first); } // 4. Ledger by root if (block == nullptr && !hash_root.second.is_zero ()) { // Search for block root - auto successor (ledger.store.block_successor (transaction_a, hash_root.second.as_block_hash ())); + auto successor (ledger.store.block_successor (transaction, hash_root.second.as_block_hash ())); + // Search for account root if (successor.is_zero ()) { nano::account_info info; - auto error (ledger.store.account_get (transaction_a, hash_root.second.as_account (), info)); + auto error (ledger.store.account_get (transaction, hash_root.second.as_account (), info)); if (!error) { successor = info.open_block; @@ -187,7 +199,7 @@ std::vector> nano::request_aggregator::a } if (!successor.is_zero ()) { - auto successor_block = ledger.store.block_get (transaction_a, successor); + auto successor_block = ledger.store.block_get (transaction, successor); debug_assert (successor_block != nullptr); // 5. Votes in cache for successor auto find_successor_votes (local_votes.votes (hash_root.second, successor)); @@ -204,15 +216,8 @@ std::vector> nano::request_aggregator::a if (block) { - // Attempt to vote for this block - if (ledger.dependents_confirmed (transaction_a, *block)) - { - to_generate.emplace_back (block->root (), block->hash ()); - } - else - { - stats.inc (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote, stat::dir::in); - } + to_generate.push_back (block); + // Let the node know about the alternative block if (block->hash () != hash_root.first) { @@ -231,45 +236,13 @@ std::vector> nano::request_aggregator::a cached_votes.erase (std::unique (cached_votes.begin (), cached_votes.end ()), cached_votes.end ()); for (auto const & vote : cached_votes) { - nano::confirm_ack confirm (vote); - channel_a->send (confirm); + reply_action (vote, channel_a); } stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes, stat::dir::in, cached_hashes); stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_votes, stat::dir::in, cached_votes.size ()); return to_generate; } -void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector> const & hashes_a, std::shared_ptr & channel_a) const -{ - size_t generated_l = 0; - auto i (hashes_a.begin ()); - auto n (hashes_a.end ()); - while (i != n) - { - std::vector hashes_l; - std::vector roots; - hashes_l.reserve (nano::network::confirm_ack_hashes_max); - roots.reserve (nano::network::confirm_ack_hashes_max); - for (; i != n && hashes_l.size () < nano::network::confirm_ack_hashes_max; ++i) - { - roots.push_back (i->first); - hashes_l.push_back (i->second); - } - wallets.foreach_representative ([this, &generated_l, &hashes_l, &roots, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) { - auto vote (this->ledger.store.vote_generate (transaction_a, pub_a, prv_a, hashes_l)); - ++generated_l; - nano::confirm_ack confirm (vote); - channel_a->send (confirm); - for (size_t i (0), n (hashes_l.size ()); i != n; ++i) - { - this->local_votes.add (roots[i], hashes_l[i], vote); - } - }); - } - stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes_a.size ()); - stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in, generated_l); -} - std::unique_ptr nano::collect_container_info (nano::request_aggregator & aggregator, const std::string & name) { auto pools_count = aggregator.size (); diff --git a/nano/node/request_aggregator.hpp b/nano/node/request_aggregator.hpp index 362647344..a20f29703 100644 --- a/nano/node/request_aggregator.hpp +++ b/nano/node/request_aggregator.hpp @@ -22,6 +22,7 @@ class ledger; class local_vote_history; class node_config; class stat; +class vote_generator; class wallets; /** * Pools together confirmation requests, separately for each endpoint. @@ -58,8 +59,7 @@ class request_aggregator final // clang-format on public: - request_aggregator () = delete; - request_aggregator (nano::network_constants const &, nano::node_config const & config, nano::stat & stats_a, nano::local_vote_history &, nano::ledger &, nano::wallets &, nano::active_transactions &); + request_aggregator (nano::network_constants const &, nano::node_config const & config, nano::stat & stats_a, nano::vote_generator &, nano::local_vote_history &, nano::ledger &, nano::wallets &, nano::active_transactions &); /** Add a new request by \p channel_a for hashes \p hashes_roots_a */ void add (std::shared_ptr & channel_a, std::vector> const & hashes_roots_a); @@ -77,15 +77,15 @@ private: /** Remove duplicate requests **/ void erase_duplicates (std::vector> &) const; /** Aggregate \p requests_a and send cached votes to \p channel_a . Return the remaining hashes that need vote generation **/ - std::vector> aggregate (nano::transaction const &, std::vector> const & requests_a, std::shared_ptr & channel_a) const; - /** Generate votes from \p hashes_a and send to \p channel_a **/ - void generate (nano::transaction const &, std::vector> const & hashes_a, std::shared_ptr & channel_a) const; + std::vector> aggregate (std::vector> const & requests_a, std::shared_ptr & channel_a) const; + void reply_action (std::shared_ptr const & vote_a, std::shared_ptr & channel_a) const; nano::stat & stats; nano::local_vote_history & local_votes; nano::ledger & ledger; nano::wallets & wallets; nano::active_transactions & active; + nano::vote_generator & generator; // clang-format off boost::multi_index_container #include +#include #include #include #include @@ -9,20 +9,32 @@ #include #include -#include - #include +bool nano::local_vote_history::consistency_check (nano::root const & root_a) const +{ + auto & history_by_root (history.get ()); + auto const range (history_by_root.equal_range (root_a)); + // All cached votes for a root must be for the same hash, this is actively enforced in local_vote_history::add + auto consistent = std::all_of (range.first, range.second, [hash = range.first->hash](auto const & info_a) { return info_a.hash == hash; }); + std::vector accounts; + std::transform (range.first, range.second, std::back_inserter (accounts), [](auto const & info_a) { return info_a.vote->account; }); + std::sort (accounts.begin (), accounts.end ()); + // All cached votes must be unique by account, this is actively enforced in local_vote_history::add + consistent = consistent && accounts.size () == std::unique (accounts.begin (), accounts.end ()) - accounts.begin (); + return consistent; +} + void nano::local_vote_history::add (nano::root const & root_a, nano::block_hash const & hash_a, std::shared_ptr const & vote_a) { nano::lock_guard guard (mutex); clean (); auto & history_by_root (history.get ()); - // Erase any vote that is not for this hash + // Erase any vote that is not for this hash, or duplicate by account auto range (history_by_root.equal_range (root_a)); for (auto i (range.first); i != range.second;) { - if (i->hash != hash_a) + if (i->hash != hash_a || vote_a->account == i->vote->account) { i = history_by_root.erase (i); } @@ -32,8 +44,9 @@ void nano::local_vote_history::add (nano::root const & root_a, nano::block_hash } } auto result (history_by_root.emplace (root_a, hash_a, vote_a)); + (void)result; debug_assert (result.second); - debug_assert (std::all_of (history_by_root.equal_range (root_a).first, history_by_root.equal_range (root_a).second, [&hash_a](local_vote const & item_a) -> bool { return item_a.vote != nullptr && item_a.hash == hash_a; })); + debug_assert (consistency_check (root_a)); } void nano::local_vote_history::erase (nano::root const & root_a) @@ -98,13 +111,14 @@ std::unique_ptr nano::collect_container_info (na return composite; } -nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a) : +nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stat & stats_a) : config (config_a), ledger (ledger_a), wallets (wallets_a), vote_processor (vote_processor_a), history (history_a), network (network_a), +stats (stats_a), thread ([this]() { run (); }) { nano::unique_lock lock (mutex); @@ -113,16 +127,27 @@ thread ([this]() { run (); }) void nano::vote_generator::add (nano::root const & root_a, nano::block_hash const & hash_a) { - auto transaction (ledger.store.tx_begin_read ()); - auto block (ledger.store.block_get (transaction, hash_a)); - if (block != nullptr && ledger.dependents_confirmed (transaction, *block)) + auto votes (history.votes (root_a, hash_a)); + if (!votes.empty ()) { - nano::unique_lock lock (mutex); - hashes.emplace_back (root_a, hash_a); - if (hashes.size () >= nano::network::confirm_ack_hashes_max) + for (auto const & vote : votes) { - lock.unlock (); - condition.notify_all (); + broadcast_action (vote); + } + } + else + { + auto transaction (ledger.store.tx_begin_read ()); + auto block (ledger.store.block_get (transaction, hash_a)); + if (block != nullptr && ledger.dependents_confirmed (transaction, *block)) + { + nano::unique_lock lock (mutex); + candidates.emplace_back (root_a, hash_a); + if (candidates.size () >= nano::network::confirm_ack_hashes_max) + { + lock.unlock (); + condition.notify_all (); + } } } } @@ -141,34 +166,141 @@ void nano::vote_generator::stop () } } -void nano::vote_generator::send (nano::unique_lock & lock_a) +size_t nano::vote_generator::generate (std::vector> const & blocks_a, std::shared_ptr const & channel_a) { - std::vector hashes_l; - std::vector roots; - hashes_l.reserve (nano::network::confirm_ack_hashes_max); - roots.reserve (nano::network::confirm_ack_hashes_max); - while (!hashes.empty () && hashes_l.size () < nano::network::confirm_ack_hashes_max) - { - auto front (hashes.front ()); - hashes.pop_front (); - roots.push_back (front.first); - hashes_l.push_back (front.second); - } - lock_a.unlock (); + request_t::first_type candidates; { auto transaction (ledger.store.tx_begin_read ()); - wallets.foreach_representative ([this, &hashes_l, &roots, &transaction](nano::public_key const & pub_a, nano::raw_key const & prv_a) { - auto vote (this->ledger.store.vote_generate (transaction, pub_a, prv_a, hashes_l)); - for (size_t i (0), n (hashes_l.size ()); i != n; ++i) + auto dependents_confirmed = [&blocks_a, &transaction, this](auto const & block_a) { + return this->ledger.dependents_confirmed (transaction, *block_a); + }; + auto as_candidate = [](auto const & block_a) { + return candidate_t{ block_a->root (), block_a->hash () }; + }; + nano::transform_if (blocks_a.begin (), blocks_a.end (), std::back_inserter (candidates), dependents_confirmed, as_candidate); + } + auto const result = candidates.size (); + nano::lock_guard guard (mutex); + requests.emplace_back (std::move (candidates), channel_a); + while (requests.size () > max_requests) + { + // On a large queue of requests, erase the oldest one + requests.pop_front (); + stats.inc (nano::stat::type::vote_generator, nano::stat::detail::generator_replies_discarded); + } + return result; +} + +void nano::vote_generator::set_reply_action (std::function const &, std::shared_ptr &)> action_a) +{ + release_assert (!reply_action); + reply_action = action_a; +} + +void nano::vote_generator::broadcast (nano::unique_lock & lock_a) +{ + debug_assert (lock_a.owns_lock ()); + std::unordered_set> cached_sent; + std::vector hashes; + std::vector roots; + hashes.reserve (nano::network::confirm_ack_hashes_max); + roots.reserve (nano::network::confirm_ack_hashes_max); + while (!candidates.empty () && hashes.size () < nano::network::confirm_ack_hashes_max) + { + auto const & [root, hash] = candidates.front (); + auto cached_votes = history.votes (root, hash); + for (auto const & cached_vote : cached_votes) + { + if (cached_sent.insert (cached_vote).second) { - this->history.add (roots[i], hashes_l[i], vote); + broadcast_action (cached_vote); } - this->network.flood_vote_pr (vote); - this->network.flood_vote (vote, 2.0f); - this->vote_processor.vote (vote, std::make_shared (this->network.udp_channels, this->network.endpoint (), this->network_params.protocol.protocol_version)); + } + if (cached_votes.empty ()) + { + roots.push_back (root); + hashes.push_back (hash); + } + candidates.pop_front (); + } + if (!hashes.empty ()) + { + lock_a.unlock (); + vote (hashes, roots, [this](auto const & vote_a) { this->broadcast_action (vote_a); }); + lock_a.lock (); + } + stats.inc (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts); +} + +void nano::vote_generator::reply (nano::unique_lock & lock_a, request_t && request_a) +{ + lock_a.unlock (); + std::unordered_set> cached_sent; + auto transaction (ledger.store.tx_begin_read ()); + auto i (request_a.first.cbegin ()); + auto n (request_a.first.cend ()); + while (i != n && !stopped) + { + std::vector hashes; + std::vector roots; + hashes.reserve (nano::network::confirm_ack_hashes_max); + roots.reserve (nano::network::confirm_ack_hashes_max); + for (; i != n && hashes.size () < nano::network::confirm_ack_hashes_max; ++i) + { + auto cached_votes = history.votes (i->first, i->second); + for (auto const & cached_vote : cached_votes) + { + if (cached_sent.insert (cached_vote).second) + { + stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_late_hashes, stat::dir::in, cached_vote->blocks.size ()); + stats.inc (nano::stat::type::requests, nano::stat::detail::requests_cached_late_votes, stat::dir::in); + reply_action (cached_vote, request_a.second); + } + } + if (cached_votes.empty ()) + { + roots.push_back (i->first); + hashes.push_back (i->second); + } + } + if (!hashes.empty ()) + { + stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes.size ()); + vote (hashes, roots, [this, &channel = request_a.second](std::shared_ptr const & vote_a) { + this->reply_action (vote_a, channel); + this->stats.inc (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in); + }); + } + } + stats.inc (nano::stat::type::vote_generator, nano::stat::detail::generator_replies); + lock_a.lock (); +} + +void nano::vote_generator::vote (std::vector const & hashes_a, std::vector const & roots_a, std::function const &)> const & action_a) +{ + debug_assert (hashes_a.size () == roots_a.size ()); + std::vector> votes_l; + { + auto transaction (ledger.store.tx_begin_read ()); + wallets.foreach_representative ([this, &hashes_a, &roots_a, &transaction, &votes_l](nano::public_key const & pub_a, nano::raw_key const & prv_a) { + votes_l.emplace_back (this->ledger.store.vote_generate (transaction, pub_a, prv_a, hashes_a)); }); } - lock_a.lock (); + for (auto const & vote_l : votes_l) + { + for (size_t i (0), n (hashes_a.size ()); i != n; ++i) + { + history.add (roots_a[i], hashes_a[i], vote_l); + } + action_a (vote_l); + } +} + +void nano::vote_generator::broadcast_action (std::shared_ptr const & vote_a) const +{ + network.flood_vote_pr (vote_a); + network.flood_vote (vote_a, 2.0f); + vote_processor.vote (vote_a, std::make_shared (network.udp_channels, network.endpoint (), network_params.protocol.protocol_version)); } void nano::vote_generator::run () @@ -181,20 +313,26 @@ void nano::vote_generator::run () lock.lock (); while (!stopped) { - if (hashes.size () >= nano::network::confirm_ack_hashes_max) + if (candidates.size () >= nano::network::confirm_ack_hashes_max) { - send (lock); + broadcast (lock); + } + else if (!requests.empty ()) + { + auto request (requests.front ()); + requests.pop_front (); + reply (lock, std::move (request)); } else { - condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= nano::network::confirm_ack_hashes_max; }); - if (hashes.size () >= config.vote_generator_threshold && hashes.size () < nano::network::confirm_ack_hashes_max) + condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->candidates.size () >= nano::network::confirm_ack_hashes_max; }); + if (candidates.size () >= config.vote_generator_threshold && candidates.size () < nano::network::confirm_ack_hashes_max) { - condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= nano::network::confirm_ack_hashes_max; }); + condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->candidates.size () >= nano::network::confirm_ack_hashes_max; }); } - if (!hashes.empty ()) + if (!candidates.empty ()) { - send (lock); + broadcast (lock); } } } @@ -208,27 +346,31 @@ generator (vote_generator_a) void nano::vote_generator_session::add (nano::root const & root_a, nano::block_hash const & hash_a) { debug_assert (nano::thread_role::get () == nano::thread_role::name::request_loop); - hashes.emplace_back (root_a, hash_a); + items.emplace_back (root_a, hash_a); } void nano::vote_generator_session::flush () { debug_assert (nano::thread_role::get () == nano::thread_role::name::request_loop); - for (auto const & i : hashes) + for (auto const & [root, hash] : items) { - generator.add (i.first, i.second); + generator.add (root, hash); } } std::unique_ptr nano::collect_container_info (nano::vote_generator & vote_generator, const std::string & name) { - size_t hashes_count = 0; + size_t candidates_count = 0; + size_t requests_count = 0; { nano::lock_guard guard (vote_generator.mutex); - hashes_count = vote_generator.hashes.size (); + candidates_count = vote_generator.candidates.size (); + requests_count = vote_generator.requests.size (); } - auto sizeof_hashes_element = sizeof (decltype (vote_generator.hashes)::value_type); + auto sizeof_candidate_element = sizeof (decltype (vote_generator.candidates)::value_type); + auto sizeof_request_element = sizeof (decltype (vote_generator.requests)::value_type); auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "hashes", hashes_count, sizeof_hashes_element })); + composite->add_component (std::make_unique (container_info{ "candidates", candidates_count, sizeof_candidate_element })); + composite->add_component (std::make_unique (container_info{ "requests", requests_count, sizeof_request_element })); return composite; } diff --git a/nano/node/voting.hpp b/nano/node/voting.hpp index 2dcb0acbd..03ee6ae5c 100644 --- a/nano/node/voting.hpp +++ b/nano/node/voting.hpp @@ -23,8 +23,13 @@ namespace nano class ledger; class network; class node_config; +class stat; class vote_processor; class wallets; +namespace transport +{ + class channel; +} class local_vote_history final { @@ -63,6 +68,8 @@ private: size_t const max_size{ nano::network_params{}.voting.max_cache }; void clean (); std::vector> votes (nano::root const & root_a) const; + // Only used in Debug + bool consistency_check (nano::root const &) const; mutable std::mutex mutex; friend std::unique_ptr collect_container_info (local_vote_history & history, const std::string & name); @@ -74,25 +81,40 @@ std::unique_ptr collect_container_info (local_vote_his class vote_generator final { +private: + using candidate_t = std::pair; + using request_t = std::pair, std::shared_ptr>; + public: - vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a); + vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stat & stats_a); + /** Queue items for vote generation, or broadcast votes already in cache */ void add (nano::root const &, nano::block_hash const &); + /** Queue blocks for vote generation, returning the number of successful candidates.*/ + size_t generate (std::vector> const & blocks_a, std::shared_ptr const & channel_a); + void set_reply_action (std::function const &, std::shared_ptr &)>); void stop (); private: void run (); - void send (nano::unique_lock &); + void broadcast (nano::unique_lock &); + void reply (nano::unique_lock &, request_t &&); + void vote (std::vector const &, std::vector const &, std::function const &)> const &); + void broadcast_action (std::shared_ptr const &) const; + std::function const &, std::shared_ptr &)> reply_action; // must be set only during initialization by using set_reply_action nano::node_config const & config; nano::ledger & ledger; nano::wallets & wallets; nano::vote_processor & vote_processor; nano::local_vote_history & history; nano::network & network; + nano::stat & stats; mutable std::mutex mutex; nano::condition_variable condition; - std::deque> hashes; + static size_t constexpr max_requests{ 2048 }; + std::deque requests; + std::deque candidates; nano::network_params network_params; - bool stopped{ false }; + std::atomic stopped{ false }; bool started{ false }; std::thread thread; @@ -110,6 +132,6 @@ public: private: nano::vote_generator & generator; - std::vector> hashes; + std::vector> items; }; } diff --git a/nano/secure/blockstore_partial.hpp b/nano/secure/blockstore_partial.hpp index 655dd740c..711101bdd 100644 --- a/nano/secure/blockstore_partial.hpp +++ b/nano/secure/blockstore_partial.hpp @@ -1,6 +1,8 @@ #pragma once +#include #include +#include #include #include @@ -273,6 +275,7 @@ public: std::shared_ptr vote_generate (nano::transaction const & transaction_a, nano::account const & account_a, nano::raw_key const & key_a, std::shared_ptr block_a) override { + debug_assert (nano::network_constants ().is_dev_network () || nano::thread_role::get () == nano::thread_role::name::voting); nano::lock_guard lock (cache_mutex); auto result (vote_current (transaction_a, account_a)); uint64_t sequence ((result ? result->sequence : 0) + 1); @@ -283,6 +286,7 @@ public: std::shared_ptr vote_generate (nano::transaction const & transaction_a, nano::account const & account_a, nano::raw_key const & key_a, std::vector blocks_a) override { + debug_assert (nano::network_constants ().is_dev_network () || nano::thread_role::get () == nano::thread_role::name::voting); nano::lock_guard lock (cache_mutex); auto result (vote_current (transaction_a, account_a)); uint64_t sequence ((result ? result->sequence : 0) + 1);