Hinted election scheduler (#3944)

* Modify `node::block_confirmed_or_being_confirmed`

* Add `node::bootstrap_block` helper function

* Add `hinted_scheduler` class

* Use dedicated config pattern

* Allow for limited AEC overflow

* Add `active_transactions.allow_limited_overflow` unit test

* Simplify `active_transactions.limit_vote_hinted_elections` unit test

* Improve `active_transactions::collect_container_info`

* Comments

* Stats

* Comment

Co-authored-by: Piotr Wójcik <3044353+fikumikudev@users.noreply.github.com>
This commit is contained in:
Piotr Wójcik 2022-09-13 15:11:13 +02:00 committed by GitHub
commit 2dcf1ad0aa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 651 additions and 228 deletions

View file

@ -507,7 +507,7 @@ TEST (active_transactions, inactive_votes_cache_election_start)
// An election is started for send6 but does not confirm
ASSERT_TIMELY (5s, 1 == node.active.size ());
node.vote_processor.flush ();
ASSERT_FALSE (node.block_confirmed_or_being_confirmed (node.store.tx_begin_read (), send3->hash ()));
ASSERT_FALSE (node.block_confirmed_or_being_confirmed (send3->hash ()));
// send7 cannot be voted on but an election should be started from inactive votes
ASSERT_FALSE (node.ledger.dependents_confirmed (node.store.tx_begin_read (), *send4));
node.process_active (send4);
@ -1230,7 +1230,7 @@ TEST (active_transactions, activate_inactive)
ASSERT_EQ (0, node.stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::active_conf_height, nano::stat::dir::out));
// The first block was not active so no activation takes place
ASSERT_FALSE (node.active.active (open->qualified_root ()) || node.block_confirmed_or_being_confirmed (node.store.tx_begin_read (), open->hash ()));
ASSERT_FALSE (node.active.active (open->qualified_root ()) || node.block_confirmed_or_being_confirmed (open->hash ()));
}
TEST (active_transactions, list_active)
@ -1404,165 +1404,253 @@ TEST (active_transactions, fifo)
ASSERT_TIMELY (1s, node.active.election (receive2->qualified_root ()) != nullptr);
}
// Ensures we limit the number of vote hinted elections in AEC
namespace
{
/*
* Sends `amount` raw from genesis chain into a new account and makes it a representative
*/
nano::keypair setup_rep (nano::test::system & system, nano::node & node, nano::uint128_t const amount)
{
auto latest = node.latest (nano::dev::genesis_key.pub);
auto balance = node.balance (nano::dev::genesis_key.pub);
nano::keypair key;
nano::block_builder builder;
auto send = builder
.send ()
.previous (latest)
.destination (key.pub)
.balance (balance - amount)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (latest))
.build_shared ();
auto open = builder
.open ()
.source (send->hash ())
.representative (key.pub)
.account (key.pub)
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build_shared ();
EXPECT_TRUE (nano::test::process (node, { send, open }));
EXPECT_TRUE (nano::test::confirm (node, { send, open }));
EXPECT_TIMELY (5s, nano::test::confirmed (node, { send, open }));
return key;
}
/*
* Creates `count` 1 raw sends from genesis to unique accounts and corresponding open blocks.
* The genesis chain is then confirmed, but leaves open blocks unconfirmed.
*/
std::vector<std::shared_ptr<nano::block>> setup_independent_blocks (nano::test::system & system, nano::node & node, int count)
{
std::vector<std::shared_ptr<nano::block>> blocks;
auto latest = node.latest (nano::dev::genesis_key.pub);
auto balance = node.balance (nano::dev::genesis_key.pub);
for (int n = 0; n < count; ++n)
{
nano::keypair key;
nano::block_builder builder;
balance -= 1;
auto send = builder
.send ()
.previous (latest)
.destination (key.pub)
.balance (balance)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (latest))
.build_shared ();
latest = send->hash ();
auto open = builder
.open ()
.source (send->hash ())
.representative (key.pub)
.account (key.pub)
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build_shared ();
EXPECT_TRUE (nano::test::process (node, { send, open }));
EXPECT_TIMELY (5s, nano::test::exists (node, { send, open })); // Ensure blocks are in the ledger
blocks.push_back (open);
}
// Confirm whole genesis chain at once
EXPECT_TRUE (nano::test::confirm (node, { latest }));
EXPECT_TIMELY (5s, nano::test::confirmed (node, { latest }));
return blocks;
}
}
/*
* Ensures we limit the number of vote hinted elections in AEC
*/
TEST (active_transactions, limit_vote_hinted_elections)
{
nano::test::system system;
nano::node_config config{ nano::test::get_available_port (), system.logging };
nano::node_config config = system.default_config ();
const int aec_limit = 10;
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
config.active_elections_size = 10;
config.active_elections_size = aec_limit;
config.active_elections_hinted_limit_percentage = 10; // Should give us a limit of 1 hinted election
auto & node = *system.add_node (config);
// Setup representatives
nano::keypair rep1, rep2;
{
nano::block_hash latest = node.latest (nano::dev::genesis_key.pub);
nano::keypair key1, key2;
nano::send_block_builder send_block_builder;
nano::state_block_builder state_block_builder;
// Enough weight to trigger election hinting but not enough to confirm block on its own
auto amount = ((node.online_reps.trended () / 100) * node.config.election_hint_weight_percent) / 2 + 1000 * nano::Gxrb_ratio;
auto send1 = send_block_builder.make_block ()
.previous (latest)
.destination (key1.pub)
.balance (nano::dev::constants.genesis_amount - amount)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (latest))
.build_shared ();
auto send2 = send_block_builder.make_block ()
.previous (send1->hash ())
.destination (key2.pub)
.balance (nano::dev::constants.genesis_amount - 2 * amount)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (send1->hash ()))
.build_shared ();
auto open1 = state_block_builder.make_block ()
.account (key1.pub)
.previous (0)
.representative (key1.pub)
.balance (amount)
.link (send1->hash ())
.sign (key1.prv, key1.pub)
.work (*system.work.generate (key1.pub))
.build_shared ();
auto open2 = state_block_builder.make_block ()
.account (key2.pub)
.previous (0)
.representative (key2.pub)
.balance (amount)
.link (send2->hash ())
.sign (key2.prv, key2.pub)
.work (*system.work.generate (key2.pub))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*send1).code);
ASSERT_EQ (nano::process_result::progress, node.process (*send2).code);
ASSERT_EQ (nano::process_result::progress, node.process (*open1).code);
ASSERT_EQ (nano::process_result::progress, node.process (*open2).code);
nano::test::blocks_confirm (node, { send1, send2, open1, open2 }, true);
ASSERT_TIMELY (1s, node.block_confirmed (send1->hash ()));
ASSERT_TIMELY (1s, node.block_confirmed (send2->hash ()));
ASSERT_TIMELY (1s, node.block_confirmed (open1->hash ()));
ASSERT_TIMELY (1s, node.block_confirmed (open2->hash ()));
ASSERT_TIMELY (1s, node.active.empty ());
rep1 = key1;
rep2 = key2;
}
// Test vote hinting behavior
{
auto latest_balance = node.balance (nano::dev::genesis_key.pub);
auto latest = node.latest (nano::dev::genesis_key.pub);
nano::keypair key0, key1;
nano::state_block_builder builder;
// Construct two pending entries that can be received simultaneously
auto send0 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (latest)
.representative (nano::dev::genesis_key.pub)
.link (key0.pub)
.balance (latest_balance - 1)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (latest))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*send0).code);
nano::test::blocks_confirm (node, { send0 }, true);
ASSERT_TIMELY (1s, node.block_confirmed (send0->hash ()));
ASSERT_TIMELY (1s, node.active.empty ());
auto send1 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (send0->hash ())
.representative (nano::dev::genesis_key.pub)
.link (key1.pub)
.balance (latest_balance - 2)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (send0->hash ()))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*send1).code);
nano::test::blocks_confirm (node, { send1 }, true);
ASSERT_TIMELY (1s, node.block_confirmed (send1->hash ()));
ASSERT_TIMELY (1s, node.active.empty ());
// Enough weight to trigger election hinting but not enough to confirm block on its own
const auto amount = ((node.online_reps.trended () / 100) * node.config.election_hint_weight_percent) + 1000 * nano::Gxrb_ratio;
nano::keypair rep1 = setup_rep (system, node, amount / 2);
nano::keypair rep2 = setup_rep (system, node, amount / 2);
auto receive0 = builder.make_block ()
.account (key0.pub)
.previous (0)
.representative (nano::dev::genesis_key.pub)
.link (send0->hash ())
.balance (1)
.sign (key0.prv, key0.pub)
.work (*system.work.generate (key0.pub))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*receive0).code);
auto receive1 = builder.make_block ()
.account (key1.pub)
.previous (0)
.representative (nano::dev::genesis_key.pub)
.link (send1->hash ())
.balance (1)
.sign (key1.prv, key1.pub)
.work (*system.work.generate (key1.pub))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node.process (*receive1).code);
ASSERT_TRUE (node.active.empty ());
ASSERT_EQ (7, node.ledger.cache.cemented_count);
auto blocks = setup_independent_blocks (system, node, 2);
auto open0 = blocks[0];
auto open1 = blocks[1];
// Inactive vote
auto vote1 (std::make_shared<nano::vote> (rep1.pub, rep1.prv, 0, 0, std::vector<nano::block_hash>{ receive0->hash (), receive1->hash () }));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (1s, node.inactive_vote_cache.cache_size () == 2);
ASSERT_TRUE (node.active.empty ());
ASSERT_EQ (7, node.ledger.cache.cemented_count);
// Even though automatic frontier confirmation is disabled, AEC is doing funny stuff and inserting elections, clear that
WAIT (1s);
node.active.clear ();
ASSERT_TRUE (node.active.empty ());
// This vote should trigger election hinting for first receive block
auto vote2 (std::make_shared<nano::vote> (rep2.pub, rep2.prv, 0, 0, std::vector<nano::block_hash>{ receive0->hash () }));
node.vote_processor.vote (vote2, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (1s, 1 == node.active.size ());
// Ensure first transaction becomes active
ASSERT_TIMELY (1s, node.active.election (receive0->qualified_root ()) != nullptr);
// Inactive vote
auto vote1 = nano::test::make_vote (rep1, { open0, open1 });
node.vote_processor.vote (vote1, nano::test::fake_channel (node));
// Ensure new inactive vote cache entries were created
ASSERT_TIMELY (5s, node.inactive_vote_cache.cache_size () == 2);
// And no elections are getting started yet
ASSERT_ALWAYS (1s, node.active.empty ());
// And nothing got confirmed yet
ASSERT_FALSE (nano::test::confirmed (node, { open0, open1 }));
// This vote should trigger election hinting but not become active due to limit of active hinted elections
auto vote3 (std::make_shared<nano::vote> (rep2.pub, rep2.prv, 0, 0, std::vector<nano::block_hash>{ receive1->hash () }));
node.vote_processor.vote (vote3, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (1s, node.stats.count (nano::stat::type::election, nano::stat::detail::election_hinted_overflow) == 1);
ASSERT_TIMELY (1s, 1 == node.active.size ());
// Ensure second transaction does not become active
ASSERT_TIMELY (1s, node.active.election (receive1->qualified_root ()) == nullptr);
// This vote should trigger election hinting for first receive block
auto vote2 = nano::test::make_vote (rep2, { open0 });
node.vote_processor.vote (vote2, nano::test::fake_channel (node));
// Ensure an election got started for open0 block
ASSERT_TIMELY (5s, node.active.size () == 1);
ASSERT_TIMELY (5s, nano::test::active (node, { open0 }));
// This final vote should confirm the first receive block
auto vote4 = (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector<nano::block_hash>{ receive0->hash () }));
node.vote_processor.vote (vote4, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (1s, node.active.empty ());
ASSERT_EQ (8, node.ledger.cache.cemented_count);
ASSERT_TIMELY (1s, node.inactive_vote_cache.cache_size () == 1);
// This vote should trigger election hinting but not become active due to limit of active hinted elections
auto vote3 = nano::test::make_vote (rep2, { open1 });
node.vote_processor.vote (vote3, nano::test::fake_channel (node));
// Ensure no new election are getting started
ASSERT_NEVER (1s, nano::test::active (node, { open1 }));
ASSERT_EQ (node.active.size (), 1);
// Now it should be possible to vote hint second block
auto vote5 = (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector<nano::block_hash>{ receive1->hash () }));
node.vote_processor.vote (vote5, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (1s, node.stats.count (nano::stat::type::election, nano::stat::detail::election_hinted_overflow) == 1);
ASSERT_TIMELY (1s, 1 == node.active.size ());
ASSERT_EQ (8, node.ledger.cache.cemented_count);
ASSERT_TIMELY (1s, node.inactive_vote_cache.cache_size () == 1);
// This final vote should confirm the first receive block
auto vote4 = nano::test::make_final_vote (nano::dev::genesis_key, { open0 });
node.vote_processor.vote (vote4, nano::test::fake_channel (node));
// Ensure election for open0 block got confirmed
ASSERT_TIMELY (5s, nano::test::confirmed (node, { open0 }));
// Ensure there was no overflow
ASSERT_EQ (0, node.stats.count (nano::stat::type::election, nano::stat::detail::election_drop_overflow));
}
// Now a second block should get vote hinted
ASSERT_TIMELY (5s, nano::test::active (node, { open1 }));
// Ensure there was no overflow of elections
ASSERT_EQ (0, node.stats.count (nano::stat::type::election, nano::stat::detail::election_drop_overflow));
}
/*
* Tests that when AEC is running at capacity from normal elections, it is still possible to schedule a limited number of hinted elections
*/
TEST (active_transactions, allow_limited_overflow)
{
nano::test::system system;
nano::node_config config = system.default_config ();
const int aec_limit = 20;
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
config.active_elections_size = aec_limit;
config.active_elections_hinted_limit_percentage = 20; // Should give us a limit of 4 hinted elections
auto & node = *system.add_node (config);
auto blocks = setup_independent_blocks (system, node, aec_limit * 4);
// Split blocks in two halves
std::vector<std::shared_ptr<nano::block>> blocks1 (blocks.begin (), blocks.begin () + blocks.size () / 2);
std::vector<std::shared_ptr<nano::block>> blocks2 (blocks.begin () + blocks.size () / 2, blocks.end ());
// Even though automatic frontier confirmation is disabled, AEC is doing funny stuff and inserting elections, clear that
WAIT (1s);
node.active.clear ();
ASSERT_TRUE (node.active.empty ());
// Insert the first part of the blocks into normal election scheduler
for (auto const & block : blocks1)
{
node.scheduler.activate (block->account (), node.store.tx_begin_read ());
}
// Ensure number of active elections reaches AEC limit and there is no overfill
ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit ());
// And it stays that way without increasing
ASSERT_ALWAYS (1s, node.active.size () == node.active.limit ());
// Insert votes for the second part of the blocks, so that those are scheduled as hinted elections
for (auto const & block : blocks2)
{
// Non-final vote, so it stays in the AEC without getting confirmed
auto vote = nano::test::make_vote (nano::dev::genesis_key, { block });
node.inactive_vote_cache.vote (block->hash (), vote);
}
// Ensure active elections overfill AEC only up to normal + hinted limit
ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit () + node.active.hinted_limit ());
// And it stays that way without increasing
ASSERT_ALWAYS (1s, node.active.size () == node.active.limit () + node.active.hinted_limit ());
}
/*
* Tests that when hinted elections are present in the AEC, normal scheduler adapts not to exceed the limit of all elections
*/
TEST (active_transactions, allow_limited_overflow_adapt)
{
nano::test::system system;
nano::node_config config = system.default_config ();
const int aec_limit = 20;
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
config.active_elections_size = aec_limit;
config.active_elections_hinted_limit_percentage = 20; // Should give us a limit of 4 hinted elections
auto & node = *system.add_node (config);
auto blocks = setup_independent_blocks (system, node, aec_limit * 4);
// Split blocks in two halves
std::vector<std::shared_ptr<nano::block>> blocks1 (blocks.begin (), blocks.begin () + blocks.size () / 2);
std::vector<std::shared_ptr<nano::block>> blocks2 (blocks.begin () + blocks.size () / 2, blocks.end ());
// Even though automatic frontier confirmation is disabled, AEC is doing funny stuff and inserting elections, clear that
WAIT (1s);
node.active.clear ();
ASSERT_TRUE (node.active.empty ());
// Insert votes for the second part of the blocks, so that those are scheduled as hinted elections
for (auto const & block : blocks2)
{
// Non-final vote, so it stays in the AEC without getting confirmed
auto vote = nano::test::make_vote (nano::dev::genesis_key, { block });
node.inactive_vote_cache.vote (block->hash (), vote);
}
// Ensure hinted election amount is bounded by hinted limit
ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.hinted_limit ());
// And it stays that way without increasing
ASSERT_ALWAYS (1s, node.active.size () == node.active.hinted_limit ());
// Insert the first part of the blocks into normal election scheduler
for (auto const & block : blocks1)
{
node.scheduler.activate (block->account (), node.store.tx_begin_read ());
}
// Ensure number of active elections reaches AEC limit and there is no overfill
ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit ());
// And it stays that way without increasing
ASSERT_ALWAYS (1s, node.active.size () == node.active.limit ());
}

View file

@ -545,6 +545,9 @@ std::string nano::stat::type_to_string (stat::type type)
case nano::stat::type::vote_cache:
res = "vote_cache";
break;
case nano::stat::type::hinting:
res = "hinting";
break;
}
return res;
}
@ -937,6 +940,15 @@ std::string nano::stat::detail_to_string (stat::detail detail)
case nano::stat::detail::invalid_network:
res = "invalid_network";
break;
case nano::stat::detail::hinted:
res = "hinted";
break;
case nano::stat::detail::insert_failed:
res = "insert_failed";
break;
case nano::stat::detail::missing_block:
res = "missing_block";
break;
}
return res;
}

View file

@ -244,7 +244,8 @@ public:
filter,
telemetry,
vote_generator,
vote_cache
vote_cache,
hinting
};
/** Optional detail type */
@ -413,7 +414,12 @@ public:
generator_broadcasts,
generator_replies,
generator_replies_discarded,
generator_spacing
generator_spacing,
// hinting
hinted,
insert_failed,
missing_block,
};
/** Direction of the stat. If the direction is irrelevant, use in */

View file

@ -96,6 +96,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::backlog_population:
thread_role_name_string = "Backlog";
break;
case nano::thread_role::name::election_hinting:
thread_role_name_string = "Hinting";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}

View file

@ -43,7 +43,8 @@ namespace thread_role
db_parallel_traversal,
election_scheduler,
unchecked,
backlog_population
backlog_population,
election_hinting
};
/*

View file

@ -68,6 +68,8 @@ add_library(
election_scheduler.cpp
gap_cache.hpp
gap_cache.cpp
hinted_scheduler.hpp
hinted_scheduler.cpp
inactive_cache_information.hpp
inactive_cache_information.cpp
inactive_cache_status.hpp

View file

@ -166,10 +166,28 @@ void nano::active_transactions::block_already_cemented_callback (nano::block_has
remove_election_winner_details (hash_a);
}
int64_t nano::active_transactions::limit () const
{
return static_cast<int64_t> (node.config.active_elections_size);
}
int64_t nano::active_transactions::hinted_limit () const
{
const uint64_t limit = node.config.active_elections_hinted_limit_percentage * node.config.active_elections_size / 100;
return static_cast<int64_t> (limit);
}
int64_t nano::active_transactions::vacancy () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto result = static_cast<int64_t> (node.config.active_elections_size) - static_cast<int64_t> (roots.size ());
auto result = limit () - static_cast<int64_t> (roots.size ());
return result;
}
int64_t nano::active_transactions::vacancy_hinted () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto result = hinted_limit () - active_hinted_elections_count;
return result;
}
@ -422,15 +440,10 @@ nano::election_insertion_result nano::active_transactions::insert_impl (nano::un
nano::election_insertion_result nano::active_transactions::insert_hinted (std::shared_ptr<nano::block> const & block_a)
{
nano::unique_lock<nano::mutex> lock (mutex);
debug_assert (block_a != nullptr);
debug_assert (vacancy_hinted () > 0); // Should only be called when there are free hinted election slots
const std::size_t limit = node.config.active_elections_hinted_limit_percentage * node.config.active_elections_size / 100;
if (active_hinted_elections_count >= limit)
{
// Reached maximum number of hinted elections, drop new ones
node.stats.inc (nano::stat::type::election, nano::stat::detail::election_hinted_overflow);
return {};
}
nano::unique_lock<nano::mutex> lock{ mutex };
auto result = insert_impl (lock, block_a, nano::election_behavior::hinted);
if (result.inserted)
@ -446,7 +459,10 @@ nano::vote_code nano::active_transactions::vote (std::shared_ptr<nano::vote> con
nano::vote_code result{ nano::vote_code::indeterminate };
// If all hashes were recently confirmed then it is a replay
unsigned recently_confirmed_counter (0);
std::vector<std::pair<std::shared_ptr<nano::election>, nano::block_hash>> process;
std::vector<nano::block_hash> inactive; // Hashes that should be added to inactive vote cache
{
nano::unique_lock<nano::mutex> lock (mutex);
for (auto const & hash : vote_a->hashes)
@ -458,10 +474,7 @@ nano::vote_code nano::active_transactions::vote (std::shared_ptr<nano::vote> con
}
else if (!recently_confirmed.exists (hash))
{
lock.unlock ();
add_inactive_vote_cache (hash, vote_a);
check_inactive_vote_cache (hash);
lock.lock ();
inactive.emplace_back (hash);
}
else
{
@ -470,6 +483,12 @@ nano::vote_code nano::active_transactions::vote (std::shared_ptr<nano::vote> con
}
}
// Process inactive votes outside of the critical section
for (auto & hash : inactive)
{
add_inactive_vote_cache (hash, vote_a);
}
if (!process.empty ())
{
bool replay (false);
@ -663,48 +682,6 @@ void nano::active_transactions::add_inactive_vote_cache (nano::block_hash const
}
}
void nano::active_transactions::check_inactive_vote_cache (nano::block_hash const & hash)
{
if (auto entry = node.inactive_vote_cache.find (hash); entry)
{
const auto min_tally = (node.online_reps.trended () / 100) * node.config.election_hint_weight_percent;
// Check that we passed minimum voting weight threshold to start a hinted election
if (entry->tally > min_tally)
{
auto transaction (node.store.tx_begin_read ());
auto block = node.store.block.get (transaction, hash);
// Check if we have the block in ledger
if (block)
{
// We have the block, check that it's not yet confirmed
if (!node.block_confirmed_or_being_confirmed (transaction, hash))
{
insert_hinted (block);
}
}
else
{
// We don't have the block yet, try to bootstrap it
// TODO: Details of bootstraping a block are not `active_transactions` concern, encapsulate somewhere
if (!node.ledger.pruning || !node.store.pruned.exists (transaction, hash))
{
node.gap_cache.bootstrap_start (hash);
}
}
}
}
}
/*
* This is called when a new block is received from live network
* We check if maybe we already have enough inactive votes stored for it to start an election
*/
void nano::active_transactions::trigger_inactive_votes_cache_election (std::shared_ptr<nano::block> const & block_a)
{
check_inactive_vote_cache (block_a->hash ());
}
std::size_t nano::active_transactions::election_winner_details_size ()
{
nano::lock_guard<nano::mutex> guard (election_winner_details_mutex);
@ -724,6 +701,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (ac
std::size_t blocks_count;
std::size_t recently_confirmed_count;
std::size_t recently_cemented_count;
std::size_t hinted_count;
{
nano::lock_guard<nano::mutex> guard (active_transactions.mutex);
@ -731,15 +709,19 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (ac
blocks_count = active_transactions.blocks.size ();
recently_confirmed_count = active_transactions.recently_confirmed.size ();
recently_cemented_count = active_transactions.recently_cemented.size ();
hinted_count = active_transactions.active_hinted_elections_count;
}
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "election_winner_details", active_transactions.election_winner_details_size (), sizeof (decltype (active_transactions.election_winner_details)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "recently_confirmed", recently_confirmed_count, sizeof (decltype (active_transactions.recently_confirmed.confirmed)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "recently_cemented", recently_cemented_count, sizeof (decltype (active_transactions.recently_cemented.cemented)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "hinted", hinted_count, 0 }));
composite->add_component (collect_container_info (active_transactions.generator, "generator"));
composite->add_component (active_transactions.recently_confirmed.collect_container_info ("recently_confirmed"));
composite->add_component (active_transactions.recently_cemented.collect_container_info ("recently_cemented"));
return composite;
}
@ -798,6 +780,15 @@ nano::recently_confirmed_cache::entry_t nano::recently_confirmed_cache::back ()
return confirmed.back ();
}
std::unique_ptr<nano::container_info_component> nano::recently_confirmed_cache::collect_container_info (const std::string & name)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "confirmed", confirmed.size (), sizeof (decltype (confirmed)::value_type) }));
return composite;
}
/*
* class recently_cemented
*/
@ -828,3 +819,12 @@ std::size_t nano::recently_cemented_cache::size () const
nano::lock_guard<nano::mutex> guard{ mutex };
return cemented.size ();
}
std::unique_ptr<nano::container_info_component> nano::recently_cemented_cache::collect_container_info (const std::string & name)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "cemented", cemented.size (), sizeof (decltype (cemented)::value_type) }));
return composite;
}

View file

@ -76,7 +76,8 @@ private:
mutable nano::mutex mutex;
friend std::unique_ptr<container_info_component> collect_container_info (active_transactions &, std::string const &);
public: // Container info
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
};
/*
@ -99,7 +100,8 @@ private:
mutable nano::mutex mutex;
friend std::unique_ptr<container_info_component> collect_container_info (active_transactions &, std::string const &);
public: // Container info
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
};
class election_insertion_result final
@ -146,6 +148,12 @@ public:
explicit active_transactions (nano::node &, nano::confirmation_height_processor &);
~active_transactions ();
/*
* Starts new election with hinted behavior type
* Hinted elections have shorter timespan and only can take up limited space inside active elections container
*/
nano::election_insertion_result insert_hinted (std::shared_ptr<nano::block> const & block_a);
// Distinguishes replay votes, cannot be determined if the block is not in any election
nano::vote_code vote (std::shared_ptr<nano::vote> const &);
// Is the root of this block in the roots container
@ -170,13 +178,25 @@ public:
void block_cemented_callback (std::shared_ptr<nano::block> const &);
void block_already_cemented_callback (nano::block_hash const &);
/*
* Maximum number of all elections that should be present in this container.
* This is only a soft limit, it is possible for this container to exceed this count.
*/
int64_t limit () const;
/*
* Maximum number of hinted elections that should be present in this container.
*/
int64_t hinted_limit () const;
int64_t vacancy () const;
/*
* How many election slots are available for hinted elections.
* The limit of AEC taken up by hinted elections is controlled by `node_config::active_elections_hinted_limit_percentage`
*/
int64_t vacancy_hinted () const;
std::function<void ()> vacancy_update{ [] () {} };
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> blocks;
// Inserts an election if conditions are met
void trigger_inactive_votes_cache_election (std::shared_ptr<nano::block> const &);
nano::election_scheduler & scheduler;
nano::confirmation_height_processor & confirmation_height_processor;
nano::node & node;
@ -197,10 +217,7 @@ private:
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> election_winner_details;
// Call action with confirmed block, may be different than what we started with
// clang-format off
nano::election_insertion_result insert_impl (nano::unique_lock<nano::mutex> &, std::shared_ptr<nano::block> const&, nano::election_behavior = nano::election_behavior::normal, std::function<void(std::shared_ptr<nano::block>const&)> const & = nullptr);
// clang-format on
nano::election_insertion_result insert_hinted (std::shared_ptr<nano::block> const & block_a);
nano::election_insertion_result insert_impl (nano::unique_lock<nano::mutex> &, std::shared_ptr<nano::block> const &, nano::election_behavior = nano::election_behavior::normal, std::function<void (std::shared_ptr<nano::block> const &)> const & = nullptr);
void request_loop ();
void request_confirm (nano::unique_lock<nano::mutex> &);
void erase (nano::qualified_root const &);
@ -209,8 +226,11 @@ private:
// Returns a list of elections sorted by difficulty, mutex must be locked
std::vector<std::shared_ptr<nano::election>> list_active_impl (std::size_t) const;
/*
* Checks if vote passes minimum representative weight threshold and adds it to inactive vote cache
* TODO: Should be moved to `vote_cache` class
*/
void add_inactive_vote_cache (nano::block_hash const & hash, std::shared_ptr<nano::vote> const vote);
void check_inactive_vote_cache (nano::block_hash const & hash);
nano::condition_variable condition;
bool started{ false };

View file

@ -320,10 +320,9 @@ void nano::block_processor::process_live (nano::transaction const & transaction_
auto account = block_a->account ().is_zero () ? block_a->sideband ().account : block_a->account ();
node.scheduler.activate (account, transaction_a);
}
else
{
node.active.trigger_inactive_votes_cache_election (block_a);
}
// Notify inactive vote cache about a new live block
node.inactive_vote_cache.trigger (block_a->hash ());
// Announce block contents to the network
if (origin_a == nano::block_origin::local)

View file

@ -99,7 +99,12 @@ bool nano::election_scheduler::manual_queue_predicate () const
bool nano::election_scheduler::overfill_predicate () const
{
return node.active.vacancy () < 0;
/*
* Both normal and hinted election schedulers are well-behaved, meaning they first check for AEC vacancy before inserting new elections.
* However, it is possible that AEC will be temporarily overfilled in case it's running at full capacity and election hinting or manual queue kicks in.
* That case will lead to unwanted churning of elections, so this allows for AEC to be overfilled to 125% until erasing of elections happens.
*/
return node.active.vacancy () < -(node.active.limit () / 4);
}
void nano::election_scheduler::run ()

View file

@ -0,0 +1,129 @@
#include <nano/lib/stats.hpp>
#include <nano/node/hinted_scheduler.hpp>
#include <nano/node/node.hpp>
nano::hinted_scheduler::hinted_scheduler (config const & config_a, nano::node & node_a, nano::vote_cache & inactive_vote_cache_a, nano::active_transactions & active_a, nano::online_reps & online_reps_a, nano::stat & stats_a) :
config_m{ config_a },
node{ node_a },
inactive_vote_cache{ inactive_vote_cache_a },
active{ active_a },
online_reps{ online_reps_a },
stats{ stats_a },
stopped{ false }
{
}
nano::hinted_scheduler::~hinted_scheduler ()
{
stop ();
if (thread.joinable ()) // Ensure thread was started
{
thread.join ();
}
}
void nano::hinted_scheduler::start ()
{
debug_assert (!thread.joinable ());
thread = std::thread{
[this] () { run (); }
};
}
void nano::hinted_scheduler::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;
notify ();
}
void nano::hinted_scheduler::notify ()
{
condition.notify_all ();
}
bool nano::hinted_scheduler::predicate (nano::uint128_t const & minimum_tally) const
{
// Check if there is space inside AEC for a new hinted election
if (active.vacancy_hinted () > 0)
{
// Check if there is any vote cache entry surpassing our minimum vote tally threshold
if (inactive_vote_cache.peek (minimum_tally))
{
return true;
}
}
return false;
}
bool nano::hinted_scheduler::run_one (nano::uint128_t const & minimum_tally)
{
if (auto top = inactive_vote_cache.pop (minimum_tally); top)
{
const auto hash = top->hash; // Hash of block we want to hint
// Check if block exists
auto block = node.block (hash);
if (block != nullptr)
{
// Ensure block is not already confirmed
if (!node.block_confirmed_or_being_confirmed (hash))
{
// Try to insert it into AEC as hinted election
// We check for AEC vacancy inside our predicate
auto result = node.active.insert_hinted (block);
stats.inc (nano::stat::type::hinting, result.inserted ? nano::stat::detail::hinted : nano::stat::detail::insert_failed);
return result.inserted; // Return whether block was inserted
}
}
else
{
// Missing block in ledger to start an election
node.bootstrap_block (hash);
stats.inc (nano::stat::type::hinting, nano::stat::detail::missing_block);
}
}
return false;
}
void nano::hinted_scheduler::run ()
{
nano::thread_role::set (nano::thread_role::name::election_hinting);
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
// It is possible that if we are waiting long enough this tally becomes outdated due to changes in trended online weight
// However this is only used here for hinting, election does independent tally calculation, so there is no need to ensure it's always up-to-date
const auto minimum_tally = tally_threshold ();
// Periodically wakeup for condition checking
// We are not notified every time new vote arrives in inactive vote cache as that happens too often
condition.wait_for (lock, std::chrono::milliseconds (config_m.vote_cache_check_interval_ms), [this, minimum_tally] () {
return stopped || predicate (minimum_tally);
});
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
if (!stopped)
{
// We don't need the lock when running main loop
lock.unlock ();
if (predicate (minimum_tally))
{
run_one (minimum_tally);
}
lock.lock ();
}
}
}
nano::uint128_t nano::hinted_scheduler::tally_threshold () const
{
const auto min_tally = (online_reps.trended () / 100) * node.config.election_hint_weight_percent;
return min_tally;
}

View file

@ -0,0 +1,67 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/timer.hpp>
#include <nano/lib/utility.hpp>
#include <nano/secure/common.hpp>
#include <condition_variable>
#include <memory>
#include <queue>
#include <thread>
#include <vector>
namespace nano
{
class node;
class active_transactions;
class vote_cache;
class online_reps;
/*
* Monitors inactive vote cache and schedules elections with the highest observed vote tally.
*/
class hinted_scheduler final
{
public: // Config
struct config final
{
// Interval of wakeup to check inactive vote cache when idle
uint64_t vote_cache_check_interval_ms;
};
public:
explicit hinted_scheduler (config const &, nano::node &, nano::vote_cache &, nano::active_transactions &, nano::online_reps &, nano::stat &);
~hinted_scheduler ();
void start ();
void stop ();
/*
* Notify about changes in AEC vacancy
*/
void notify ();
private:
bool predicate (nano::uint128_t const & minimum_tally) const;
void run ();
bool run_one (nano::uint128_t const & minimum_tally);
nano::uint128_t tally_threshold () const;
private: // Dependencies
nano::node & node;
nano::vote_cache & inactive_vote_cache;
nano::active_transactions & active;
nano::online_reps & online_reps;
nano::stat & stats;
private:
config const config_m;
bool stopped;
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
};
}

View file

@ -46,6 +46,13 @@ nano::vote_cache::config nano::nodeconfig_to_vote_cache_config (node_config cons
return cfg;
}
nano::hinted_scheduler::config nano::nodeconfig_to_hinted_scheduler_config (const nano::node_config & config)
{
hinted_scheduler::config cfg;
cfg.vote_cache_check_interval_ms = config.network_params.network.is_dev_network () ? 100u : 1000u;
return cfg;
}
void nano::node::keepalive (std::string const & address_a, uint16_t port_a)
{
auto node_l (shared_from_this ());
@ -165,9 +172,10 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
history{ config.network_params.voting },
vote_uniquer (block_uniquer),
confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, config.logging, logger, node_initialized_latch, flags.confirmation_height_processor_mode),
inactive_vote_cache{ nodeconfig_to_vote_cache_config (config, flags) },
inactive_vote_cache{ nano::nodeconfig_to_vote_cache_config (config, flags) },
active (*this, confirmation_height_processor),
scheduler{ *this },
hinting{ nano::nodeconfig_to_hinted_scheduler_config (config), *this, inactive_vote_cache, active, online_reps, stats },
aggregator (config, stats, active.generator, active.final_generator, history, ledger, wallets, active),
wallets (wallets_store.init_error (), *this),
backlog{ nano::nodeconfig_to_backlog_population_config (config), store, scheduler },
@ -187,7 +195,11 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
{
telemetry->start ();
active.vacancy_update = [this] () { scheduler.notify (); };
// Notify election schedulers when AEC frees election slot
active.vacancy_update = [this] () {
scheduler.notify ();
hinting.notify ();
};
if (config.websocket_config.enabled)
{
@ -725,6 +737,7 @@ void nano::node::start ()
}
wallets.start ();
backlog.start ();
hinting.start ();
}
void nano::node::stop ()
@ -740,6 +753,7 @@ void nano::node::stop ()
aggregator.stop ();
vote_processor.stop ();
scheduler.stop ();
hinting.stop ();
active.stop ();
confirmation_height_processor.stop ();
network.stop ();
@ -1303,9 +1317,9 @@ bool nano::node::block_confirmed (nano::block_hash const & hash_a)
return ledger.block_confirmed (transaction, hash_a);
}
bool nano::node::block_confirmed_or_being_confirmed (nano::transaction const & transaction_a, nano::block_hash const & hash_a)
bool nano::node::block_confirmed_or_being_confirmed (nano::block_hash const & hash_a)
{
return confirmation_height_processor.is_processing_block (hash_a) || ledger.block_confirmed (transaction_a, hash_a);
return confirmation_height_processor.is_processing_block (hash_a) || ledger.block_confirmed (store.tx_begin_read (), hash_a);
}
void nano::node::ongoing_online_weight_calculation_queue ()
@ -1765,6 +1779,16 @@ std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> nano::node::get_
return { max_blocks, weights };
}
void nano::node::bootstrap_block (const nano::block_hash & hash)
{
// If we are running pruning node check if block was not already pruned
if (!ledger.pruning || !store.pruned.exists (store.tx_begin_read (), hash))
{
// We don't have the block, try to bootstrap it
gap_cache.bootstrap_start (hash);
}
}
/** Convenience function to easily return the confirmation height of an account. */
uint64_t nano::node::get_confirmation_height (nano::transaction const & transaction_a, nano::account & account_a)
{

View file

@ -15,6 +15,7 @@
#include <nano/node/election.hpp>
#include <nano/node/election_scheduler.hpp>
#include <nano/node/gap_cache.hpp>
#include <nano/node/hinted_scheduler.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node_observers.hpp>
#include <nano/node/nodeconfig.hpp>
@ -55,8 +56,9 @@ class work_pool;
std::unique_ptr<container_info_component> collect_container_info (rep_crawler & rep_crawler, std::string const & name);
// Configs
backlog_population::config nodeconfig_to_backlog_population_config (node_config const & config);
backlog_population::config nodeconfig_to_backlog_population_config (node_config const &);
vote_cache::config nodeconfig_to_vote_cache_config (node_config const &, node_flags const &);
hinted_scheduler::config nodeconfig_to_hinted_scheduler_config (node_config const &);
class node final : public std::enable_shared_from_this<nano::node>
{
@ -116,7 +118,7 @@ public:
*/
std::shared_ptr<nano::election> block_confirm (std::shared_ptr<nano::block> const &);
bool block_confirmed (nano::block_hash const &);
bool block_confirmed_or_being_confirmed (nano::transaction const &, nano::block_hash const &);
bool block_confirmed_or_being_confirmed (nano::block_hash const &);
void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr<std::string> const &, std::shared_ptr<std::string> const &, std::shared_ptr<boost::asio::ip::tcp::resolver> const &);
void ongoing_online_weight_calculation ();
void ongoing_online_weight_calculation_queue ();
@ -126,6 +128,10 @@ public:
void set_bandwidth_params (std::size_t limit, double ratio);
std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> get_bootstrap_weights () const;
uint64_t get_confirmation_height (nano::transaction const &, nano::account &);
/*
* Attempts to bootstrap block. This is the best effort, there is no guarantee that the block will be bootstrapped.
*/
void bootstrap_block (nano::block_hash const &);
nano::write_database_queue write_database_queue;
boost::asio::io_context & io_ctx;
boost::latch node_initialized_latch;
@ -168,6 +174,7 @@ public:
nano::vote_cache inactive_vote_cache;
nano::active_transactions active;
nano::election_scheduler scheduler;
nano::hinted_scheduler hinting;
nano::request_aggregator aggregator;
nano::wallets wallets;
nano::backlog_population backlog;

View file

@ -1,4 +1,5 @@
#include <nano/crypto_lib/random_pool.hpp>
#include <nano/node/transport/fake.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
@ -174,9 +175,24 @@ std::shared_ptr<nano::vote> nano::test::make_vote (nano::keypair key, std::vecto
return make_vote (key, hashes, timestamp, duration);
}
std::shared_ptr<nano::vote> nano::test::make_final_vote (nano::keypair key, std::vector<nano::block_hash> hashes)
{
return make_vote (key, hashes, nano::vote::timestamp_max, nano::vote::duration_max);
}
std::shared_ptr<nano::vote> nano::test::make_final_vote (nano::keypair key, std::vector<std::shared_ptr<nano::block>> blocks)
{
return make_vote (key, blocks, nano::vote::timestamp_max, nano::vote::duration_max);
}
std::vector<nano::block_hash> nano::test::blocks_to_hashes (std::vector<std::shared_ptr<nano::block>> blocks)
{
std::vector<nano::block_hash> hashes;
std::transform (blocks.begin (), blocks.end (), std::back_inserter (hashes), [] (auto & block) { return block->hash (); });
return hashes;
}
std::shared_ptr<nano::transport::channel> nano::test::fake_channel (nano::node & node)
{
return std::make_shared<nano::transport::fake::channel> (node);
}

View file

@ -3,6 +3,7 @@
#include <nano/lib/errors.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/transport/transport.hpp>
#include <gtest/gtest.h>
@ -63,6 +64,17 @@
} \
EXPECT_NO_ERROR (ec);
/*
* Asserts that the `val1 == val2` condition becomes true within the deadline
* Condition must hold for at least 2 consecutive reads
*/
#define ASSERT_TIMELY_EQ(time, val1, val2) \
system.deadline_set (time); \
while (!((val1) == (val2)) && !system.poll ()) \
{ \
} \
ASSERT_EQ (val1, val2);
/*
* Waits specified number of time while keeping system running.
* Useful for asserting conditions that should still hold after some delay of time
@ -73,6 +85,26 @@
{ \
}
/*
* Asserts that condition is always true during the specified amount of time
*/
#define ASSERT_ALWAYS(time, condition) \
system.deadline_set (time); \
while (!system.poll ()) \
{ \
ASSERT_TRUE (condition); \
}
/*
* Asserts that condition is never true during the specified amount of time
*/
#define ASSERT_NEVER(time, condition) \
system.deadline_set (time); \
while (!system.poll ()) \
{ \
ASSERT_FALSE (condition); \
}
/* Convenience globals for gtest projects */
namespace nano
{
@ -311,9 +343,21 @@ namespace test
* Convenience function to create a new vote from list of block hashes
*/
std::shared_ptr<nano::vote> make_vote (nano::keypair key, std::vector<nano::block_hash> hashes, uint64_t timestamp = 0, uint8_t duration = 0);
/*
* Convenience function to create a new final vote from list of blocks
*/
std::shared_ptr<nano::vote> make_final_vote (nano::keypair key, std::vector<std::shared_ptr<nano::block>> blocks);
/*
* Convenience function to create a new final vote from list of block hashes
*/
std::shared_ptr<nano::vote> make_final_vote (nano::keypair key, std::vector<nano::block_hash> hashes);
/*
* Converts list of blocks to list of hashes
*/
std::vector<nano::block_hash> blocks_to_hashes (std::vector<std::shared_ptr<nano::block>> blocks);
/*
* Creates a new fake channel associated with `node`
*/
std::shared_ptr<nano::transport::channel> fake_channel (nano::node & node);
}
}