diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 259a4c3a9..242e0c0b5 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -30,6 +30,7 @@ add_executable( network.cpp network_filter.cpp node.cpp + processing_queue.cpp processor_service.cpp peer_container.cpp prioritization.cpp diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 0afa62706..ff28378f4 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2589,7 +2589,7 @@ TEST (node, vote_by_hash_bundle) for (auto const & block : blocks) { - system.nodes[0]->active.generator.add (block->root (), block->hash ()); + system.nodes[0]->generator.add (block->root (), block->hash ()); } // Verify that bundling occurs. While reaching 12 should be common on most hardware in release mode, diff --git a/nano/core_test/processing_queue.cpp b/nano/core_test/processing_queue.cpp new file mode 100644 index 000000000..85c54a9f3 --- /dev/null +++ b/nano/core_test/processing_queue.cpp @@ -0,0 +1,119 @@ +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +TEST (processing_queue, construction) +{ + nano::test::system system{}; + nano::processing_queue queue{ system.stats, {}, {}, 4, 8 * 1024, 1024 }; + ASSERT_EQ (queue.size (), 0); +} + +TEST (processing_queue, process_one) +{ + nano::test::system system{}; + nano::processing_queue queue{ system.stats, {}, {}, 4, 8 * 1024, 1024 }; + + std::atomic processed{ 0 }; + queue.process_batch = [&] (auto & batch) { + processed += batch.size (); + }; + queue.start (); + + queue.add (1); + + ASSERT_TIMELY (5s, processed == 1); + ASSERT_ALWAYS (1s, processed == 1); + ASSERT_EQ (queue.size (), 0); +} + +TEST (processing_queue, process_many) +{ + nano::test::system system{}; + nano::processing_queue queue{ system.stats, {}, {}, 4, 8 * 1024, 1024 }; + + std::atomic processed{ 0 }; + queue.process_batch = [&] (auto & batch) { + processed += batch.size (); + }; + queue.start (); + + const int count = 1024; + for (int n = 0; n < count; ++n) + { + queue.add (1); + } + + ASSERT_TIMELY (5s, processed == count); + ASSERT_ALWAYS (1s, processed == count); + ASSERT_EQ (queue.size (), 0); +} + +TEST (processing_queue, max_queue_size) +{ + nano::test::system system{}; + nano::processing_queue queue{ system.stats, {}, {}, 4, 1024, 128 }; + + const int count = 2 * 1024; // Double the max queue size + for (int n = 0; n < count; ++n) + { + queue.add (1); + } + + ASSERT_EQ (queue.size (), 1024); +} + +TEST (processing_queue, max_batch_size) +{ + nano::test::system system{}; + nano::processing_queue queue{ system.stats, {}, {}, 4, 1024, 128 }; + + // Fill queue before starting processing threads + const int count = 1024; + for (int n = 0; n < count; ++n) + { + queue.add (1); + } + + std::atomic max_batch{ 0 }; + queue.process_batch = [&] (auto & batch) { + if (batch.size () > max_batch) + { + max_batch = batch.size (); + } + }; + queue.start (); + + ASSERT_TIMELY (5s, max_batch == 128); + ASSERT_ALWAYS (1s, max_batch == 128); + ASSERT_EQ (queue.size (), 0); +} + +TEST (processing_queue, parallel) +{ + nano::test::system system{}; + nano::processing_queue queue{ system.stats, {}, {}, 16, 1024, 1 }; + + std::atomic processed{ 0 }; + queue.process_batch = [&] (auto & batch) { + std::this_thread::sleep_for (2s); + processed += batch.size (); + }; + queue.start (); + + const int count = 16; + for (int n = 0; n < count; ++n) + { + queue.add (1); + } + + // There are 16 threads and 16 items, each thread is waiting 1 second inside processing callback + // If processing is done in parallel it should take ~2 seconds to process every item, but keep some margin for slow machines + ASSERT_TIMELY (3s, processed == count); + ASSERT_EQ (queue.size (), 0); +} \ No newline at end of file diff --git a/nano/core_test/voting.cpp b/nano/core_test/voting.cpp index a9065f4e0..282ff91e1 100644 --- a/nano/core_test/voting.cpp +++ b/nano/core_test/voting.cpp @@ -61,7 +61,7 @@ TEST (vote_generator, cache) auto & node (*system.nodes[0]); auto epoch1 = system.upgrade_genesis_epoch (node, nano::epoch::epoch_1); system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); - node.active.generator.add (epoch1->root (), epoch1->hash ()); + node.generator.add (epoch1->root (), epoch1->hash ()); ASSERT_TIMELY (1s, !node.history.votes (epoch1->root (), epoch1->hash ()).empty ()); auto votes (node.history.votes (epoch1->root (), epoch1->hash ())); ASSERT_FALSE (votes.empty ()); @@ -108,7 +108,7 @@ TEST (vote_generator, session) nano::test::system system (1); auto node (system.nodes[0]); system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); - nano::vote_generator_session generator_session (node->active.generator); + nano::vote_generator_session generator_session (node->generator); boost::thread thread ([node, &generator_session] () { nano::thread_role::set (nano::thread_role::name::request_loop); generator_session.add (nano::dev::genesis->account (), nano::dev::genesis->hash ()); @@ -184,15 +184,15 @@ TEST (vote_spacing, vote_generator) .build_shared (); ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); ASSERT_EQ (0, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts)); - node.active.generator.add (nano::dev::genesis->hash (), send1->hash ()); + node.generator.add (nano::dev::genesis->hash (), send1->hash ()); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts) == 1); ASSERT_FALSE (node.ledger.rollback (node.store.tx_begin_write (), send1->hash ())); ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send2).code); - node.active.generator.add (nano::dev::genesis->hash (), send2->hash ()); + node.generator.add (nano::dev::genesis->hash (), send2->hash ()); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_spacing) == 1); ASSERT_EQ (1, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts)); std::this_thread::sleep_for (config.network_params.voting.delay); - node.active.generator.add (nano::dev::genesis->hash (), send2->hash ()); + node.generator.add (nano::dev::genesis->hash (), send2->hash ()); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts) == 2); } @@ -227,14 +227,14 @@ TEST (vote_spacing, rapid) .work (*system.work.generate (nano::dev::genesis->hash ())) .build_shared (); ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); - node.active.generator.add (nano::dev::genesis->hash (), send1->hash ()); + node.generator.add (nano::dev::genesis->hash (), send1->hash ()); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts) == 1); ASSERT_FALSE (node.ledger.rollback (node.store.tx_begin_write (), send1->hash ())); ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send2).code); - node.active.generator.add (nano::dev::genesis->hash (), send2->hash ()); + node.generator.add (nano::dev::genesis->hash (), send2->hash ()); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_spacing) == 1); ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts)); std::this_thread::sleep_for (config.network_params.voting.delay); - node.active.generator.add (nano::dev::genesis->hash (), send2->hash ()); + node.generator.add (nano::dev::genesis->hash (), send2->hash ()); ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts) == 2); } diff --git a/nano/lib/CMakeLists.txt b/nano/lib/CMakeLists.txt index 6772787c0..f54b80b00 100644 --- a/nano/lib/CMakeLists.txt +++ b/nano/lib/CMakeLists.txt @@ -54,6 +54,7 @@ add_library( numbers.cpp observer_set.hpp optional_ptr.hpp + processing_queue.hpp rate_limiting.hpp rate_limiting.cpp rep_weights.hpp diff --git a/nano/lib/processing_queue.hpp b/nano/lib/processing_queue.hpp new file mode 100644 index 000000000..c5ffdb375 --- /dev/null +++ b/nano/lib/processing_queue.hpp @@ -0,0 +1,174 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace nano +{ +/** + * Queue that processes enqueued elements in (possibly parallel) batches + */ +template +class processing_queue final +{ +public: + using value_t = T; + + /** + * @param thread_role Spawned processing threads will use this name + * @param thread_count Number of processing threads + * @param max_queue_size Max number of items enqueued, items beyond this value will be discarded + * @param max_batch_size Max number of elements processed in single batch, 0 for unlimited (default) + */ + processing_queue (nano::stat & stats, nano::stat::type stat_type, nano::thread_role::name thread_role, std::size_t thread_count, std::size_t max_queue_size, std::size_t max_batch_size = 0) : + stats{ stats }, + stat_type{ stat_type }, + thread_role{ thread_role }, + thread_count{ thread_count }, + max_queue_size{ max_queue_size }, + max_batch_size{ max_batch_size } + { + } + + ~processing_queue () + { + stop (); + } + + void start () + { + for (int n = 0; n < thread_count; ++n) + { + threads.emplace_back ([this] () { + run (); + }); + } + } + + void stop () + { + stopped = true; + condition.notify_all (); + for (auto & thread : threads) + { + thread.join (); + } + threads.clear (); + } + + /** + * Queues item for batch processing + */ + void add (T const & item) + { + nano::unique_lock lock{ mutex }; + if (queue.size () < max_queue_size) + { + queue.emplace_back (item); + lock.unlock (); + condition.notify_one (); + stats.inc (stat_type, nano::stat::detail::queue); + } + else + { + stats.inc (stat_type, nano::stat::detail::overfill); + } + } + + std::size_t size () const + { + nano::lock_guard guard{ mutex }; + return queue.size (); + } + +public: // Container info + std::unique_ptr collect_container_info (std::string const & name) + { + nano::lock_guard guard{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "queue", queue.size (), sizeof (typename decltype (queue)::value_type) })); + return composite; + } + +private: + std::deque next_batch (nano::unique_lock & lock) + { + release_assert (lock.owns_lock ()); + + condition.wait (lock, [this] () { + return stopped || !queue.empty (); + }); + + if (stopped) + { + return {}; + } + + debug_assert (!queue.empty ()); + + // Unlimited batch size or queue smaller than max batch size, return the whole current queue + if (max_batch_size == 0 || queue.size () < max_batch_size) + { + decltype (queue) queue_l; + queue_l.swap (queue); + return queue_l; + } + // Larger than max batch size, return limited number of elements + else + { + decltype (queue) queue_l; + for (int n = 0; n < max_batch_size; ++n) + { + debug_assert (!queue.empty ()); + queue_l.emplace_back (queue.front ()); + queue.pop_front (); + } + return queue_l; + } + } + + void run () + { + nano::thread_role::set (thread_role); + nano::unique_lock lock{ mutex }; + while (!stopped) + { + auto batch = next_batch (lock); + lock.unlock (); + stats.inc (stat_type, nano::stat::detail::batch); + process_batch (batch); + lock.lock (); + } + } + +public: + std::function &)> process_batch{ [] (auto &) { debug_assert (false, "processing queue callback empty"); } }; + +private: + nano::stat & stats; + + const nano::stat::type stat_type; + const nano::thread_role::name thread_role; + const std::size_t thread_count; + const std::size_t max_queue_size; + const std::size_t max_batch_size; + +private: + std::deque queue; + std::atomic stopped{ false }; + mutable nano::mutex mutex; + nano::condition_variable condition; + std::vector threads; +}; +} \ No newline at end of file diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index 632cdf725..80b365ba8 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -563,6 +563,15 @@ std::string nano::stat::detail_to_string (stat::detail detail) case nano::stat::detail::all: res = "all"; break; + case nano::stat::detail::queue: + res = "queue"; + break; + case nano::stat::detail::overfill: + res = "overfill"; + break; + case nano::stat::detail::batch: + res = "batch"; + break; case nano::stat::detail::bad_sender: res = "bad_sender"; break; diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index 3e039dcad..164562804 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -254,6 +254,11 @@ public: { all = 0, + // processing queue + queue, + overfill, + batch, + // error specific bad_sender, insufficient_work, diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index 7f37d0a89..ff6b74c25 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -99,6 +99,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::election_hinting: thread_role_name_string = "Hinting"; break; + case nano::thread_role::name::vote_generator_queue: + thread_role_name_string = "Voting que"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index d0e4f409f..28f3c4080 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -44,7 +44,8 @@ namespace thread_role election_scheduler, unchecked, backlog_population, - election_hinting + election_hinting, + vote_generator_queue, }; /* diff --git a/nano/lib/utility.hpp b/nano/lib/utility.hpp index 0e0da30fa..4547fab51 100644 --- a/nano/lib/utility.hpp +++ b/nano/lib/utility.hpp @@ -42,7 +42,7 @@ void assert_internal (char const * check_expr, char const * func, char const * f #endif #ifdef NDEBUG -#define debug_assert(check) (void)0 +#define debug_assert(...) (void)0 #else #define debug_assert_1(check) check ? (void)0 : assert_internal (#check, BOOST_CURRENT_FUNCTION, __FILE__, __LINE__, false) #define debug_assert_2(check, error_msg) check ? (void)0 : assert_internal (#check, BOOST_CURRENT_FUNCTION, __FILE__, __LINE__, false, error_msg) diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 787c6aeca..9a446dfcb 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -18,8 +18,6 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi scheduler{ node_a.scheduler }, // Move dependencies requiring this circular reference confirmation_height_processor{ confirmation_height_processor_a }, node{ node_a }, - generator{ node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network, node_a.stats, false }, - final_generator{ node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network, node_a.stats, true }, recently_confirmed{ 65536 }, recently_cemented{ node.config.confirmation_history_size }, election_time_to_live{ node_a.network_params.network.is_dev_network () ? 0s : 2s }, @@ -202,8 +200,8 @@ void nano::active_transactions::request_confirm (nano::unique_lock nano::confirmation_solicitor solicitor (node.network, node.config); solicitor.prepare (node.rep_crawler.principal_representatives (std::numeric_limits::max ())); - nano::vote_generator_session generator_session (generator); - nano::vote_generator_session final_generator_session (generator); + nano::vote_generator_session generator_session (node.generator); + nano::vote_generator_session final_generator_session (node.final_generator); std::size_t unconfirmed_count_l (0); nano::timer elapsed (nano::timer_state::started); @@ -366,8 +364,6 @@ void nano::active_transactions::stop () { thread.join (); } - generator.stop (); - final_generator.stop (); lock.lock (); roots.clear (); } @@ -690,16 +686,12 @@ std::unique_ptr nano::collect_container_info (ac { std::size_t roots_count; std::size_t blocks_count; - std::size_t recently_confirmed_count; - std::size_t recently_cemented_count; std::size_t hinted_count; { nano::lock_guard guard (active_transactions.mutex); roots_count = active_transactions.roots.size (); blocks_count = active_transactions.blocks.size (); - recently_confirmed_count = active_transactions.recently_confirmed.size (); - recently_cemented_count = active_transactions.recently_cemented.size (); hinted_count = active_transactions.active_hinted_elections_count; } @@ -708,7 +700,6 @@ std::unique_ptr nano::collect_container_info (ac composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) })); composite->add_component (std::make_unique (container_info{ "election_winner_details", active_transactions.election_winner_details_size (), sizeof (decltype (active_transactions.election_winner_details)::value_type) })); composite->add_component (std::make_unique (container_info{ "hinted", hinted_count, 0 })); - composite->add_component (collect_container_info (active_transactions.generator, "generator")); composite->add_component (active_transactions.recently_confirmed.collect_container_info ("recently_confirmed")); composite->add_component (active_transactions.recently_cemented.collect_container_info ("recently_cemented")); diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index d3af842c7..a0a2591e7 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -203,9 +203,6 @@ public: void add_election_winner_details (nano::block_hash const &, std::shared_ptr const &); void remove_election_winner_details (nano::block_hash const &); - nano::vote_generator generator; - nano::vote_generator final_generator; - recently_confirmed_cache recently_confirmed; recently_cemented_cache recently_cemented; diff --git a/nano/node/election.cpp b/nano/node/election.cpp index abc9c6db6..de2b8b604 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -32,7 +32,7 @@ nano::election::election (nano::node & node_a, std::shared_ptr cons last_blocks.emplace (block_a->hash (), block_a); if (node.config.enable_voting && node.wallets.reps ().voting > 0) { - node.active.generator.add (root, block_a->hash ()); + node.generator.add (root, block_a->hash ()); } } @@ -322,7 +322,7 @@ void nano::election::confirm_if_quorum (nano::unique_lock & lock_a) { auto hash = status.winner->hash (); lock_a.unlock (); - node.active.final_generator.add (root, hash); + node.final_generator.add (root, hash); lock_a.lock (); } if (!node.ledger.cache.final_votes_confirmation_canary.load () || final_weight >= node.online_reps.delta ()) @@ -488,12 +488,12 @@ void nano::election::generate_votes () const { auto hash = status.winner->hash (); lock.unlock (); - node.active.final_generator.add (root, hash); + node.final_generator.add (root, hash); lock.lock (); } else { - node.active.generator.add (root, status.winner->hash ()); + node.generator.add (root, status.winner->hash ()); } } } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 7d3ed99a9..cbd59876b 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -173,10 +173,12 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co vote_uniquer (block_uniquer), confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, config.logging, logger, node_initialized_latch, flags.confirmation_height_processor_mode), inactive_vote_cache{ nano::nodeconfig_to_vote_cache_config (config, flags) }, + generator{ config, ledger, wallets, vote_processor, history, network, stats, /* non-final */ false }, + final_generator{ config, ledger, wallets, vote_processor, history, network, stats, /* final */ true }, active (*this, confirmation_height_processor), scheduler{ *this }, hinting{ nano::nodeconfig_to_hinted_scheduler_config (config), *this, inactive_vote_cache, active, online_reps, stats }, - aggregator (config, stats, active.generator, active.final_generator, history, ledger, wallets, active), + aggregator (config, stats, generator, final_generator, history, ledger, wallets, active), wallets (wallets_store.init_error (), *this), backlog{ nano::nodeconfig_to_backlog_population_config (config), store, scheduler }, startup_time (std::chrono::steady_clock::now ()), @@ -631,6 +633,8 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.aggregator, "request_aggregator")); composite->add_component (node.scheduler.collect_container_info ("election_scheduler")); composite->add_component (node.inactive_vote_cache.collect_container_info ("inactive_vote_cache")); + composite->add_component (collect_container_info (node.generator, "vote_generator")); + composite->add_component (collect_container_info (node.final_generator, "vote_generator_final")); return composite; } @@ -740,6 +744,8 @@ void nano::node::start () port_mapping.start (); } wallets.start (); + generator.start (); + final_generator.start (); backlog.start (); hinting.start (); } @@ -759,6 +765,8 @@ void nano::node::stop () scheduler.stop (); hinting.stop (); active.stop (); + generator.stop (); + final_generator.stop (); confirmation_height_processor.stop (); network.stop (); telemetry->stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 55fe2ec38..bb137a278 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -172,6 +172,8 @@ public: nano::vote_uniquer vote_uniquer; nano::confirmation_height_processor confirmation_height_processor; nano::vote_cache inactive_vote_cache; + nano::vote_generator generator; + nano::vote_generator final_generator; nano::active_transactions active; nano::election_scheduler scheduler; nano::hinted_scheduler hinting; diff --git a/nano/node/voting.cpp b/nano/node/voting.cpp index d61409f25..4bb6ccfee 100644 --- a/nano/node/voting.cpp +++ b/nano/node/voting.cpp @@ -171,14 +171,20 @@ nano::vote_generator::vote_generator (nano::node_config const & config_a, nano:: spacing{ config_a.network_params.voting.delay }, network (network_a), stats (stats_a), - thread ([this] () { run (); }), - is_final (is_final_a) + is_final (is_final_a), + vote_generation_queue{ stats, nano::stat::type::vote_generator, nano::thread_role::name::vote_generator_queue, /* single threaded */ 1, /* max queue size */ 1024 * 32, /* max batch size */ 1024 * 4 } { - nano::unique_lock lock (mutex); - condition.wait (lock, [&started = started] { return started; }); + vote_generation_queue.process_batch = [this] (auto & batch) { + process_batch (batch); + }; } -void nano::vote_generator::add (nano::root const & root_a, nano::block_hash const & hash_a) +nano::vote_generator::~vote_generator () +{ + stop (); +} + +void nano::vote_generator::process (nano::write_transaction const & transaction, nano::root const & root_a, nano::block_hash const & hash_a) { auto cached_votes (history.votes (root_a, hash_a, is_final)); if (!cached_votes.empty ()) @@ -193,14 +199,12 @@ void nano::vote_generator::add (nano::root const & root_a, nano::block_hash cons auto should_vote (false); if (is_final) { - auto transaction (ledger.store.tx_begin_write ({ tables::final_votes })); auto block (ledger.store.block.get (transaction, hash_a)); should_vote = block != nullptr && ledger.dependents_confirmed (transaction, *block) && ledger.store.final_vote.put (transaction, block->qualified_root (), hash_a); debug_assert (block == nullptr || root_a == block->root ()); } else { - auto transaction (ledger.store.tx_begin_read ()); auto block (ledger.store.block.get (transaction, hash_a)); should_vote = block != nullptr && ledger.dependents_confirmed (transaction, *block); } @@ -217,8 +221,18 @@ void nano::vote_generator::add (nano::root const & root_a, nano::block_hash cons } } +void nano::vote_generator::start () +{ + debug_assert (!thread.joinable ()); + thread = std::thread ([this] () { run (); }); + + vote_generation_queue.start (); +} + void nano::vote_generator::stop () { + vote_generation_queue.stop (); + nano::unique_lock lock (mutex); stopped = true; @@ -231,6 +245,21 @@ void nano::vote_generator::stop () } } +void nano::vote_generator::add (const root & root, const block_hash & hash) +{ + vote_generation_queue.add (std::make_pair (root, hash)); +} + +void nano::vote_generator::process_batch (std::deque & batch) +{ + auto transaction = ledger.store.tx_begin_write ({ tables::final_votes }); + + for (auto & [root, hash] : batch) + { + process (transaction, root, hash); + } +} + std::size_t nano::vote_generator::generate (std::vector> const & blocks_a, std::shared_ptr const & channel_a) { request_t::first_type req_candidates; @@ -388,10 +417,6 @@ void nano::vote_generator::run () { nano::thread_role::set (nano::thread_role::name::voting); nano::unique_lock lock (mutex); - started = true; - lock.unlock (); - condition.notify_all (); - lock.lock (); while (!stopped) { if (candidates.size () >= nano::network::confirm_ack_hashes_max) @@ -453,5 +478,6 @@ std::unique_ptr nano::collect_container_info (na auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "candidates", candidates_count, sizeof_candidate_element })); composite->add_component (std::make_unique (container_info{ "requests", requests_count, sizeof_request_element })); + composite->add_component (vote_generator.vote_generation_queue.collect_container_info ("vote_generation_queue")); return composite; } diff --git a/nano/node/voting.hpp b/nano/node/voting.hpp index c404a0242..492577bb7 100644 --- a/nano/node/voting.hpp +++ b/nano/node/voting.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -118,14 +119,19 @@ class vote_generator final private: using candidate_t = std::pair; using request_t = std::pair, std::shared_ptr>; + using queue_entry_t = std::pair; public: vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stat & stats_a, bool is_final_a); + ~vote_generator (); + /** Queue items for vote generation, or broadcast votes already in cache */ void add (nano::root const &, nano::block_hash const &); /** Queue blocks for vote generation, returning the number of successful candidates.*/ std::size_t generate (std::vector> const & blocks_a, std::shared_ptr const & channel_a); void set_reply_action (std::function const &, std::shared_ptr const &)>); + + void start (); void stop (); private: @@ -134,7 +140,17 @@ private: void reply (nano::unique_lock &, request_t &&); void vote (std::vector const &, std::vector const &, std::function const &)> const &); void broadcast_action (std::shared_ptr const &) const; + void process_batch (std::deque & batch); + /** + * Check if block is eligible for vote generation, then generates a vote or broadcasts votes already in cache + * @param transaction : needs `tables::final_votes` lock + */ + void process (nano::write_transaction const &, nano::root const &, nano::block_hash const &); + +private: std::function const &, std::shared_ptr &)> reply_action; // must be set only during initialization by using set_reply_action + +private: // Dependencies nano::node_config const & config; nano::ledger & ledger; nano::wallets & wallets; @@ -143,15 +159,19 @@ private: nano::vote_spacing spacing; nano::network & network; nano::stat & stats; + +private: + processing_queue vote_generation_queue; + +private: + const bool is_final; mutable nano::mutex mutex; nano::condition_variable condition; static std::size_t constexpr max_requests{ 2048 }; std::deque requests; std::deque candidates; std::atomic stopped{ false }; - bool started{ false }; std::thread thread; - bool is_final; friend std::unique_ptr collect_container_info (vote_generator & vote_generator, std::string const & name); }; @@ -161,7 +181,7 @@ std::unique_ptr collect_container_info (vote_generator class vote_generator_session final { public: - vote_generator_session (vote_generator & vote_generator_a); + explicit vote_generator_session (vote_generator & vote_generator_a); void add (nano::root const &, nano::block_hash const &); void flush (); diff --git a/nano/test_common/system.hpp b/nano/test_common/system.hpp index 4fe7305c3..17f6b659e 100644 --- a/nano/test_common/system.hpp +++ b/nano/test_common/system.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -22,6 +23,7 @@ namespace test system (); system (uint16_t, nano::transport::transport_type = nano::transport::transport_type::tcp, nano::node_flags = nano::node_flags ()); ~system (); + void ledger_initialization_set (std::vector const & reps, nano::amount const & reserve = 0); void generate_activity (nano::node &, std::vector &); void generate_mass_activity (uint32_t, nano::node &); @@ -60,15 +62,18 @@ namespace test */ nano::node_config default_config (); + public: boost::asio::io_context io_ctx; std::vector> nodes; nano::logging logging; + nano::stat stats; nano::work_pool work{ nano::dev::network_params.network, std::max (nano::hardware_concurrency (), 1u) }; std::chrono::time_point> deadline{ std::chrono::steady_clock::time_point::max () }; double deadline_scaling_factor{ 1.0 }; unsigned node_sequence{ 0 }; std::vector> initialization_blocks; }; + std::unique_ptr upgrade_epoch (nano::work_pool &, nano::ledger &, nano::epoch); void blocks_confirm (nano::node &, std::vector> const &, bool const = false); uint16_t get_available_port ();