Add batching to vote_generator (#3963)

This commit is contained in:
Piotr Wójcik 2022-09-30 20:55:38 +02:00 committed by GitHub
commit 8fba6b13d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 406 additions and 44 deletions

View file

@ -30,6 +30,7 @@ add_executable(
network.cpp
network_filter.cpp
node.cpp
processing_queue.cpp
processor_service.cpp
peer_container.cpp
prioritization.cpp

View file

@ -2589,7 +2589,7 @@ TEST (node, vote_by_hash_bundle)
for (auto const & block : blocks)
{
system.nodes[0]->active.generator.add (block->root (), block->hash ());
system.nodes[0]->generator.add (block->root (), block->hash ());
}
// Verify that bundling occurs. While reaching 12 should be common on most hardware in release mode,

View file

@ -0,0 +1,119 @@
#include <nano/lib/processing_queue.hpp>
#include <nano/lib/stats.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
using namespace std::chrono_literals;
TEST (processing_queue, construction)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 8 * 1024, 1024 };
ASSERT_EQ (queue.size (), 0);
}
TEST (processing_queue, process_one)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 8 * 1024, 1024 };
std::atomic<std::size_t> processed{ 0 };
queue.process_batch = [&] (auto & batch) {
processed += batch.size ();
};
queue.start ();
queue.add (1);
ASSERT_TIMELY (5s, processed == 1);
ASSERT_ALWAYS (1s, processed == 1);
ASSERT_EQ (queue.size (), 0);
}
TEST (processing_queue, process_many)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 8 * 1024, 1024 };
std::atomic<std::size_t> processed{ 0 };
queue.process_batch = [&] (auto & batch) {
processed += batch.size ();
};
queue.start ();
const int count = 1024;
for (int n = 0; n < count; ++n)
{
queue.add (1);
}
ASSERT_TIMELY (5s, processed == count);
ASSERT_ALWAYS (1s, processed == count);
ASSERT_EQ (queue.size (), 0);
}
TEST (processing_queue, max_queue_size)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 1024, 128 };
const int count = 2 * 1024; // Double the max queue size
for (int n = 0; n < count; ++n)
{
queue.add (1);
}
ASSERT_EQ (queue.size (), 1024);
}
TEST (processing_queue, max_batch_size)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 1024, 128 };
// Fill queue before starting processing threads
const int count = 1024;
for (int n = 0; n < count; ++n)
{
queue.add (1);
}
std::atomic<std::size_t> max_batch{ 0 };
queue.process_batch = [&] (auto & batch) {
if (batch.size () > max_batch)
{
max_batch = batch.size ();
}
};
queue.start ();
ASSERT_TIMELY (5s, max_batch == 128);
ASSERT_ALWAYS (1s, max_batch == 128);
ASSERT_EQ (queue.size (), 0);
}
TEST (processing_queue, parallel)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 16, 1024, 1 };
std::atomic<std::size_t> processed{ 0 };
queue.process_batch = [&] (auto & batch) {
std::this_thread::sleep_for (2s);
processed += batch.size ();
};
queue.start ();
const int count = 16;
for (int n = 0; n < count; ++n)
{
queue.add (1);
}
// There are 16 threads and 16 items, each thread is waiting 1 second inside processing callback
// If processing is done in parallel it should take ~2 seconds to process every item, but keep some margin for slow machines
ASSERT_TIMELY (3s, processed == count);
ASSERT_EQ (queue.size (), 0);
}

View file

@ -61,7 +61,7 @@ TEST (vote_generator, cache)
auto & node (*system.nodes[0]);
auto epoch1 = system.upgrade_genesis_epoch (node, nano::epoch::epoch_1);
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
node.active.generator.add (epoch1->root (), epoch1->hash ());
node.generator.add (epoch1->root (), epoch1->hash ());
ASSERT_TIMELY (1s, !node.history.votes (epoch1->root (), epoch1->hash ()).empty ());
auto votes (node.history.votes (epoch1->root (), epoch1->hash ()));
ASSERT_FALSE (votes.empty ());
@ -108,7 +108,7 @@ TEST (vote_generator, session)
nano::test::system system (1);
auto node (system.nodes[0]);
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
nano::vote_generator_session generator_session (node->active.generator);
nano::vote_generator_session generator_session (node->generator);
boost::thread thread ([node, &generator_session] () {
nano::thread_role::set (nano::thread_role::name::request_loop);
generator_session.add (nano::dev::genesis->account (), nano::dev::genesis->hash ());
@ -184,15 +184,15 @@ TEST (vote_spacing, vote_generator)
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
ASSERT_EQ (0, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts));
node.active.generator.add (nano::dev::genesis->hash (), send1->hash ());
node.generator.add (nano::dev::genesis->hash (), send1->hash ());
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts) == 1);
ASSERT_FALSE (node.ledger.rollback (node.store.tx_begin_write (), send1->hash ()));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send2).code);
node.active.generator.add (nano::dev::genesis->hash (), send2->hash ());
node.generator.add (nano::dev::genesis->hash (), send2->hash ());
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_spacing) == 1);
ASSERT_EQ (1, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts));
std::this_thread::sleep_for (config.network_params.voting.delay);
node.active.generator.add (nano::dev::genesis->hash (), send2->hash ());
node.generator.add (nano::dev::genesis->hash (), send2->hash ());
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts) == 2);
}
@ -227,14 +227,14 @@ TEST (vote_spacing, rapid)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
node.active.generator.add (nano::dev::genesis->hash (), send1->hash ());
node.generator.add (nano::dev::genesis->hash (), send1->hash ());
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts) == 1);
ASSERT_FALSE (node.ledger.rollback (node.store.tx_begin_write (), send1->hash ()));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send2).code);
node.active.generator.add (nano::dev::genesis->hash (), send2->hash ());
node.generator.add (nano::dev::genesis->hash (), send2->hash ());
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_spacing) == 1);
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts));
std::this_thread::sleep_for (config.network_params.voting.delay);
node.active.generator.add (nano::dev::genesis->hash (), send2->hash ());
node.generator.add (nano::dev::genesis->hash (), send2->hash ());
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts) == 2);
}

View file

@ -54,6 +54,7 @@ add_library(
numbers.cpp
observer_set.hpp
optional_ptr.hpp
processing_queue.hpp
rate_limiting.hpp
rate_limiting.cpp
rep_weights.hpp

View file

@ -0,0 +1,174 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
namespace nano
{
/**
* Queue that processes enqueued elements in (possibly parallel) batches
*/
template <typename T>
class processing_queue final
{
public:
using value_t = T;
/**
* @param thread_role Spawned processing threads will use this name
* @param thread_count Number of processing threads
* @param max_queue_size Max number of items enqueued, items beyond this value will be discarded
* @param max_batch_size Max number of elements processed in single batch, 0 for unlimited (default)
*/
processing_queue (nano::stat & stats, nano::stat::type stat_type, nano::thread_role::name thread_role, std::size_t thread_count, std::size_t max_queue_size, std::size_t max_batch_size = 0) :
stats{ stats },
stat_type{ stat_type },
thread_role{ thread_role },
thread_count{ thread_count },
max_queue_size{ max_queue_size },
max_batch_size{ max_batch_size }
{
}
~processing_queue ()
{
stop ();
}
void start ()
{
for (int n = 0; n < thread_count; ++n)
{
threads.emplace_back ([this] () {
run ();
});
}
}
void stop ()
{
stopped = true;
condition.notify_all ();
for (auto & thread : threads)
{
thread.join ();
}
threads.clear ();
}
/**
* Queues item for batch processing
*/
void add (T const & item)
{
nano::unique_lock<nano::mutex> lock{ mutex };
if (queue.size () < max_queue_size)
{
queue.emplace_back (item);
lock.unlock ();
condition.notify_one ();
stats.inc (stat_type, nano::stat::detail::queue);
}
else
{
stats.inc (stat_type, nano::stat::detail::overfill);
}
}
std::size_t size () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return queue.size ();
}
public: // Container info
std::unique_ptr<container_info_component> collect_container_info (std::string const & name)
{
nano::lock_guard<nano::mutex> guard{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queue", queue.size (), sizeof (typename decltype (queue)::value_type) }));
return composite;
}
private:
std::deque<value_t> next_batch (nano::unique_lock<nano::mutex> & lock)
{
release_assert (lock.owns_lock ());
condition.wait (lock, [this] () {
return stopped || !queue.empty ();
});
if (stopped)
{
return {};
}
debug_assert (!queue.empty ());
// Unlimited batch size or queue smaller than max batch size, return the whole current queue
if (max_batch_size == 0 || queue.size () < max_batch_size)
{
decltype (queue) queue_l;
queue_l.swap (queue);
return queue_l;
}
// Larger than max batch size, return limited number of elements
else
{
decltype (queue) queue_l;
for (int n = 0; n < max_batch_size; ++n)
{
debug_assert (!queue.empty ());
queue_l.emplace_back (queue.front ());
queue.pop_front ();
}
return queue_l;
}
}
void run ()
{
nano::thread_role::set (thread_role);
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
auto batch = next_batch (lock);
lock.unlock ();
stats.inc (stat_type, nano::stat::detail::batch);
process_batch (batch);
lock.lock ();
}
}
public:
std::function<void (std::deque<value_t> &)> process_batch{ [] (auto &) { debug_assert (false, "processing queue callback empty"); } };
private:
nano::stat & stats;
const nano::stat::type stat_type;
const nano::thread_role::name thread_role;
const std::size_t thread_count;
const std::size_t max_queue_size;
const std::size_t max_batch_size;
private:
std::deque<value_t> queue;
std::atomic<bool> stopped{ false };
mutable nano::mutex mutex;
nano::condition_variable condition;
std::vector<std::thread> threads;
};
}

View file

@ -563,6 +563,15 @@ std::string nano::stat::detail_to_string (stat::detail detail)
case nano::stat::detail::all:
res = "all";
break;
case nano::stat::detail::queue:
res = "queue";
break;
case nano::stat::detail::overfill:
res = "overfill";
break;
case nano::stat::detail::batch:
res = "batch";
break;
case nano::stat::detail::bad_sender:
res = "bad_sender";
break;

View file

@ -254,6 +254,11 @@ public:
{
all = 0,
// processing queue
queue,
overfill,
batch,
// error specific
bad_sender,
insufficient_work,

View file

@ -99,6 +99,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::election_hinting:
thread_role_name_string = "Hinting";
break;
case nano::thread_role::name::vote_generator_queue:
thread_role_name_string = "Voting que";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}

View file

@ -44,7 +44,8 @@ namespace thread_role
election_scheduler,
unchecked,
backlog_population,
election_hinting
election_hinting,
vote_generator_queue,
};
/*

View file

@ -42,7 +42,7 @@ void assert_internal (char const * check_expr, char const * func, char const * f
#endif
#ifdef NDEBUG
#define debug_assert(check) (void)0
#define debug_assert(...) (void)0
#else
#define debug_assert_1(check) check ? (void)0 : assert_internal (#check, BOOST_CURRENT_FUNCTION, __FILE__, __LINE__, false)
#define debug_assert_2(check, error_msg) check ? (void)0 : assert_internal (#check, BOOST_CURRENT_FUNCTION, __FILE__, __LINE__, false, error_msg)

View file

@ -18,8 +18,6 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi
scheduler{ node_a.scheduler }, // Move dependencies requiring this circular reference
confirmation_height_processor{ confirmation_height_processor_a },
node{ node_a },
generator{ node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network, node_a.stats, false },
final_generator{ node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network, node_a.stats, true },
recently_confirmed{ 65536 },
recently_cemented{ node.config.confirmation_history_size },
election_time_to_live{ node_a.network_params.network.is_dev_network () ? 0s : 2s },
@ -202,8 +200,8 @@ void nano::active_transactions::request_confirm (nano::unique_lock<nano::mutex>
nano::confirmation_solicitor solicitor (node.network, node.config);
solicitor.prepare (node.rep_crawler.principal_representatives (std::numeric_limits<std::size_t>::max ()));
nano::vote_generator_session generator_session (generator);
nano::vote_generator_session final_generator_session (generator);
nano::vote_generator_session generator_session (node.generator);
nano::vote_generator_session final_generator_session (node.final_generator);
std::size_t unconfirmed_count_l (0);
nano::timer<std::chrono::milliseconds> elapsed (nano::timer_state::started);
@ -366,8 +364,6 @@ void nano::active_transactions::stop ()
{
thread.join ();
}
generator.stop ();
final_generator.stop ();
lock.lock ();
roots.clear ();
}
@ -690,16 +686,12 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (ac
{
std::size_t roots_count;
std::size_t blocks_count;
std::size_t recently_confirmed_count;
std::size_t recently_cemented_count;
std::size_t hinted_count;
{
nano::lock_guard<nano::mutex> guard (active_transactions.mutex);
roots_count = active_transactions.roots.size ();
blocks_count = active_transactions.blocks.size ();
recently_confirmed_count = active_transactions.recently_confirmed.size ();
recently_cemented_count = active_transactions.recently_cemented.size ();
hinted_count = active_transactions.active_hinted_elections_count;
}
@ -708,7 +700,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (ac
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "election_winner_details", active_transactions.election_winner_details_size (), sizeof (decltype (active_transactions.election_winner_details)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "hinted", hinted_count, 0 }));
composite->add_component (collect_container_info (active_transactions.generator, "generator"));
composite->add_component (active_transactions.recently_confirmed.collect_container_info ("recently_confirmed"));
composite->add_component (active_transactions.recently_cemented.collect_container_info ("recently_cemented"));

View file

@ -203,9 +203,6 @@ public:
void add_election_winner_details (nano::block_hash const &, std::shared_ptr<nano::election> const &);
void remove_election_winner_details (nano::block_hash const &);
nano::vote_generator generator;
nano::vote_generator final_generator;
recently_confirmed_cache recently_confirmed;
recently_cemented_cache recently_cemented;

View file

@ -32,7 +32,7 @@ nano::election::election (nano::node & node_a, std::shared_ptr<nano::block> cons
last_blocks.emplace (block_a->hash (), block_a);
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
node.active.generator.add (root, block_a->hash ());
node.generator.add (root, block_a->hash ());
}
}
@ -322,7 +322,7 @@ void nano::election::confirm_if_quorum (nano::unique_lock<nano::mutex> & lock_a)
{
auto hash = status.winner->hash ();
lock_a.unlock ();
node.active.final_generator.add (root, hash);
node.final_generator.add (root, hash);
lock_a.lock ();
}
if (!node.ledger.cache.final_votes_confirmation_canary.load () || final_weight >= node.online_reps.delta ())
@ -488,12 +488,12 @@ void nano::election::generate_votes () const
{
auto hash = status.winner->hash ();
lock.unlock ();
node.active.final_generator.add (root, hash);
node.final_generator.add (root, hash);
lock.lock ();
}
else
{
node.active.generator.add (root, status.winner->hash ());
node.generator.add (root, status.winner->hash ());
}
}
}

View file

@ -173,10 +173,12 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
vote_uniquer (block_uniquer),
confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, config.logging, logger, node_initialized_latch, flags.confirmation_height_processor_mode),
inactive_vote_cache{ nano::nodeconfig_to_vote_cache_config (config, flags) },
generator{ config, ledger, wallets, vote_processor, history, network, stats, /* non-final */ false },
final_generator{ config, ledger, wallets, vote_processor, history, network, stats, /* final */ true },
active (*this, confirmation_height_processor),
scheduler{ *this },
hinting{ nano::nodeconfig_to_hinted_scheduler_config (config), *this, inactive_vote_cache, active, online_reps, stats },
aggregator (config, stats, active.generator, active.final_generator, history, ledger, wallets, active),
aggregator (config, stats, generator, final_generator, history, ledger, wallets, active),
wallets (wallets_store.init_error (), *this),
backlog{ nano::nodeconfig_to_backlog_population_config (config), store, scheduler },
startup_time (std::chrono::steady_clock::now ()),
@ -631,6 +633,8 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.aggregator, "request_aggregator"));
composite->add_component (node.scheduler.collect_container_info ("election_scheduler"));
composite->add_component (node.inactive_vote_cache.collect_container_info ("inactive_vote_cache"));
composite->add_component (collect_container_info (node.generator, "vote_generator"));
composite->add_component (collect_container_info (node.final_generator, "vote_generator_final"));
return composite;
}
@ -740,6 +744,8 @@ void nano::node::start ()
port_mapping.start ();
}
wallets.start ();
generator.start ();
final_generator.start ();
backlog.start ();
hinting.start ();
}
@ -759,6 +765,8 @@ void nano::node::stop ()
scheduler.stop ();
hinting.stop ();
active.stop ();
generator.stop ();
final_generator.stop ();
confirmation_height_processor.stop ();
network.stop ();
telemetry->stop ();

View file

@ -172,6 +172,8 @@ public:
nano::vote_uniquer vote_uniquer;
nano::confirmation_height_processor confirmation_height_processor;
nano::vote_cache inactive_vote_cache;
nano::vote_generator generator;
nano::vote_generator final_generator;
nano::active_transactions active;
nano::election_scheduler scheduler;
nano::hinted_scheduler hinting;

View file

@ -171,14 +171,20 @@ nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::
spacing{ config_a.network_params.voting.delay },
network (network_a),
stats (stats_a),
thread ([this] () { run (); }),
is_final (is_final_a)
is_final (is_final_a),
vote_generation_queue{ stats, nano::stat::type::vote_generator, nano::thread_role::name::vote_generator_queue, /* single threaded */ 1, /* max queue size */ 1024 * 32, /* max batch size */ 1024 * 4 }
{
nano::unique_lock<nano::mutex> lock (mutex);
condition.wait (lock, [&started = started] { return started; });
vote_generation_queue.process_batch = [this] (auto & batch) {
process_batch (batch);
};
}
void nano::vote_generator::add (nano::root const & root_a, nano::block_hash const & hash_a)
nano::vote_generator::~vote_generator ()
{
stop ();
}
void nano::vote_generator::process (nano::write_transaction const & transaction, nano::root const & root_a, nano::block_hash const & hash_a)
{
auto cached_votes (history.votes (root_a, hash_a, is_final));
if (!cached_votes.empty ())
@ -193,14 +199,12 @@ void nano::vote_generator::add (nano::root const & root_a, nano::block_hash cons
auto should_vote (false);
if (is_final)
{
auto transaction (ledger.store.tx_begin_write ({ tables::final_votes }));
auto block (ledger.store.block.get (transaction, hash_a));
should_vote = block != nullptr && ledger.dependents_confirmed (transaction, *block) && ledger.store.final_vote.put (transaction, block->qualified_root (), hash_a);
debug_assert (block == nullptr || root_a == block->root ());
}
else
{
auto transaction (ledger.store.tx_begin_read ());
auto block (ledger.store.block.get (transaction, hash_a));
should_vote = block != nullptr && ledger.dependents_confirmed (transaction, *block);
}
@ -217,8 +221,18 @@ void nano::vote_generator::add (nano::root const & root_a, nano::block_hash cons
}
}
void nano::vote_generator::start ()
{
debug_assert (!thread.joinable ());
thread = std::thread ([this] () { run (); });
vote_generation_queue.start ();
}
void nano::vote_generator::stop ()
{
vote_generation_queue.stop ();
nano::unique_lock<nano::mutex> lock (mutex);
stopped = true;
@ -231,6 +245,21 @@ void nano::vote_generator::stop ()
}
}
void nano::vote_generator::add (const root & root, const block_hash & hash)
{
vote_generation_queue.add (std::make_pair (root, hash));
}
void nano::vote_generator::process_batch (std::deque<queue_entry_t> & batch)
{
auto transaction = ledger.store.tx_begin_write ({ tables::final_votes });
for (auto & [root, hash] : batch)
{
process (transaction, root, hash);
}
}
std::size_t nano::vote_generator::generate (std::vector<std::shared_ptr<nano::block>> const & blocks_a, std::shared_ptr<nano::transport::channel> const & channel_a)
{
request_t::first_type req_candidates;
@ -388,10 +417,6 @@ void nano::vote_generator::run ()
{
nano::thread_role::set (nano::thread_role::name::voting);
nano::unique_lock<nano::mutex> lock (mutex);
started = true;
lock.unlock ();
condition.notify_all ();
lock.lock ();
while (!stopped)
{
if (candidates.size () >= nano::network::confirm_ack_hashes_max)
@ -453,5 +478,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (na
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "candidates", candidates_count, sizeof_candidate_element }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "requests", requests_count, sizeof_request_element }));
composite->add_component (vote_generator.vote_generation_queue.collect_container_info ("vote_generation_queue"));
return composite;
}

View file

@ -2,6 +2,7 @@
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/processing_queue.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/wallet.hpp>
#include <nano/secure/common.hpp>
@ -118,14 +119,19 @@ class vote_generator final
private:
using candidate_t = std::pair<nano::root, nano::block_hash>;
using request_t = std::pair<std::vector<candidate_t>, std::shared_ptr<nano::transport::channel>>;
using queue_entry_t = std::pair<nano::root, nano::block_hash>;
public:
vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stat & stats_a, bool is_final_a);
~vote_generator ();
/** Queue items for vote generation, or broadcast votes already in cache */
void add (nano::root const &, nano::block_hash const &);
/** Queue blocks for vote generation, returning the number of successful candidates.*/
std::size_t generate (std::vector<std::shared_ptr<nano::block>> const & blocks_a, std::shared_ptr<nano::transport::channel> const & channel_a);
void set_reply_action (std::function<void (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &)>);
void start ();
void stop ();
private:
@ -134,7 +140,17 @@ private:
void reply (nano::unique_lock<nano::mutex> &, request_t &&);
void vote (std::vector<nano::block_hash> const &, std::vector<nano::root> const &, std::function<void (std::shared_ptr<nano::vote> const &)> const &);
void broadcast_action (std::shared_ptr<nano::vote> const &) const;
void process_batch (std::deque<queue_entry_t> & batch);
/**
* Check if block is eligible for vote generation, then generates a vote or broadcasts votes already in cache
* @param transaction : needs `tables::final_votes` lock
*/
void process (nano::write_transaction const &, nano::root const &, nano::block_hash const &);
private:
std::function<void (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> &)> reply_action; // must be set only during initialization by using set_reply_action
private: // Dependencies
nano::node_config const & config;
nano::ledger & ledger;
nano::wallets & wallets;
@ -143,15 +159,19 @@ private:
nano::vote_spacing spacing;
nano::network & network;
nano::stat & stats;
private:
processing_queue<queue_entry_t> vote_generation_queue;
private:
const bool is_final;
mutable nano::mutex mutex;
nano::condition_variable condition;
static std::size_t constexpr max_requests{ 2048 };
std::deque<request_t> requests;
std::deque<candidate_t> candidates;
std::atomic<bool> stopped{ false };
bool started{ false };
std::thread thread;
bool is_final;
friend std::unique_ptr<container_info_component> collect_container_info (vote_generator & vote_generator, std::string const & name);
};
@ -161,7 +181,7 @@ std::unique_ptr<container_info_component> collect_container_info (vote_generator
class vote_generator_session final
{
public:
vote_generator_session (vote_generator & vote_generator_a);
explicit vote_generator_session (vote_generator & vote_generator_a);
void add (nano::root const &, nano::block_hash const &);
void flush ();

View file

@ -1,6 +1,7 @@
#pragma once
#include <nano/lib/errors.hpp>
#include <nano/lib/stats.hpp>
#include <nano/node/node.hpp>
#include <chrono>
@ -22,6 +23,7 @@ namespace test
system ();
system (uint16_t, nano::transport::transport_type = nano::transport::transport_type::tcp, nano::node_flags = nano::node_flags ());
~system ();
void ledger_initialization_set (std::vector<nano::keypair> const & reps, nano::amount const & reserve = 0);
void generate_activity (nano::node &, std::vector<nano::account> &);
void generate_mass_activity (uint32_t, nano::node &);
@ -60,15 +62,18 @@ namespace test
*/
nano::node_config default_config ();
public:
boost::asio::io_context io_ctx;
std::vector<std::shared_ptr<nano::node>> nodes;
nano::logging logging;
nano::stat stats;
nano::work_pool work{ nano::dev::network_params.network, std::max (nano::hardware_concurrency (), 1u) };
std::chrono::time_point<std::chrono::steady_clock, std::chrono::duration<double>> deadline{ std::chrono::steady_clock::time_point::max () };
double deadline_scaling_factor{ 1.0 };
unsigned node_sequence{ 0 };
std::vector<std::shared_ptr<nano::block>> initialization_blocks;
};
std::unique_ptr<nano::state_block> upgrade_epoch (nano::work_pool &, nano::ledger &, nano::epoch);
void blocks_confirm (nano::node &, std::vector<std::shared_ptr<nano::block>> const &, bool const = false);
uint16_t get_available_port ();