Bounded election buckets with dynamic reprioritization (#4626)

* Keep track of started elections in buckets

* Stats

* Tests

* Fix compiler warning

* add nano::scheduler::priority_bucket config

# Conflicts:
#	nano/node/scheduler/bucket.cpp

* Fix test

* Explicitly pass config to bucket

* Bucket tests

---------

Co-authored-by: gr0vity-dev <homebot@users.noreply.github.com>
This commit is contained in:
Piotr Wójcik 2024-07-11 15:10:31 +02:00 committed by GitHub
commit 21abfc2ae4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 693 additions and 457 deletions

View file

@ -156,7 +156,8 @@ TEST (active_elections, confirm_frontier)
ASSERT_GT (election2->confirmation_request_count, 0u);
}
TEST (active_elections, keep_local)
// TODO: Adjust for new behaviour of bounded buckets
TEST (active_elections, DISABLED_keep_local)
{
nano::test::system system{};
@ -1293,35 +1294,33 @@ TEST (active_elections, list_active)
TEST (active_elections, vacancy)
{
std::atomic<bool> updated = false;
{
nano::test::system system;
nano::node_config config = system.default_config ();
config.active_elections.size = 1;
auto & node = *system.add_node (config);
nano::state_block_builder builder;
auto send = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.link (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
node.active.vacancy_update = [&updated] () { updated = true; };
ASSERT_EQ (nano::block_status::progress, node.process (send));
ASSERT_EQ (1, node.active.vacancy (nano::election_behavior::priority));
ASSERT_EQ (0, node.active.size ());
auto election1 = nano::test::start_election (system, node, send->hash ());
ASSERT_TIMELY (1s, updated);
updated = false;
ASSERT_EQ (0, node.active.vacancy (nano::election_behavior::priority));
ASSERT_EQ (1, node.active.size ());
election1->force_confirm ();
ASSERT_TIMELY (1s, updated);
ASSERT_EQ (1, node.active.vacancy (nano::election_behavior::priority));
ASSERT_EQ (0, node.active.size ());
}
nano::test::system system;
nano::node_config config = system.default_config ();
config.active_elections.size = 1;
auto & node = *system.add_node (config);
nano::state_block_builder builder;
auto send = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.link (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
node.active.vacancy_update = [&updated] () { updated = true; };
ASSERT_EQ (nano::block_status::progress, node.process (send));
ASSERT_EQ (1, node.active.vacancy (nano::election_behavior::priority));
ASSERT_EQ (0, node.active.size ());
auto election1 = nano::test::start_election (system, node, send->hash ());
ASSERT_TIMELY (1s, updated);
updated = false;
ASSERT_EQ (0, node.active.vacancy (nano::election_behavior::priority));
ASSERT_EQ (1, node.active.size ());
election1->force_confirm ();
ASSERT_TIMELY (1s, updated);
ASSERT_EQ (1, node.active.vacancy (nano::election_behavior::priority));
ASSERT_EQ (0, node.active.size ());
}
/*
@ -1389,101 +1388,3 @@ TEST (active_elections, limit_vote_hinted_elections)
// Ensure there was no overflow of elections
ASSERT_EQ (0, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::priority));
}
/*
* Tests that when AEC is running at capacity from normal elections, it is still possible to schedule a limited number of hinted elections
*/
TEST (active_elections, allow_limited_overflow)
{
nano::test::system system;
nano::node_config config = system.default_config ();
const int aec_limit = 20;
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
config.active_elections.size = aec_limit;
config.active_elections.hinted_limit_percentage = 20; // Should give us a limit of 4 hinted elections
auto & node = *system.add_node (config);
auto blocks = nano::test::setup_independent_blocks (system, node, aec_limit * 4);
// Split blocks in two halves
std::vector<std::shared_ptr<nano::block>> blocks1 (blocks.begin (), blocks.begin () + blocks.size () / 2);
std::vector<std::shared_ptr<nano::block>> blocks2 (blocks.begin () + blocks.size () / 2, blocks.end ());
// Even though automatic frontier confirmation is disabled, AEC is doing funny stuff and inserting elections, clear that
WAIT (1s);
node.active.clear ();
ASSERT_TRUE (node.active.empty ());
// Insert the first part of the blocks into normal election scheduler
for (auto const & block : blocks1)
{
node.scheduler.priority.activate (node.ledger.tx_begin_read (), block->account ());
}
// Ensure number of active elections reaches AEC limit and there is no overfill
ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::priority));
// And it stays that way without increasing
ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::priority));
// Insert votes for the second part of the blocks, so that those are scheduled as hinted elections
for (auto const & block : blocks2)
{
// Non-final vote, so it stays in the AEC without getting confirmed
auto vote = nano::test::make_vote (nano::dev::genesis_key, { block });
node.vote_cache.insert (vote);
}
// Ensure active elections overfill AEC only up to normal + hinted limit
ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::priority) + node.active.limit (nano::election_behavior::hinted));
// And it stays that way without increasing
ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::priority) + node.active.limit (nano::election_behavior::hinted));
}
/*
* Tests that when hinted elections are present in the AEC, normal scheduler adapts not to exceed the limit of all elections
*/
TEST (active_elections, allow_limited_overflow_adapt)
{
nano::test::system system;
nano::node_config config = system.default_config ();
const int aec_limit = 20;
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
config.active_elections.size = aec_limit;
config.active_elections.hinted_limit_percentage = 20; // Should give us a limit of 4 hinted elections
auto & node = *system.add_node (config);
auto blocks = nano::test::setup_independent_blocks (system, node, aec_limit * 4);
// Split blocks in two halves
std::vector<std::shared_ptr<nano::block>> blocks1 (blocks.begin (), blocks.begin () + blocks.size () / 2);
std::vector<std::shared_ptr<nano::block>> blocks2 (blocks.begin () + blocks.size () / 2, blocks.end ());
// Even though automatic frontier confirmation is disabled, AEC is doing funny stuff and inserting elections, clear that
WAIT (1s);
node.active.clear ();
ASSERT_TRUE (node.active.empty ());
// Insert votes for the second part of the blocks, so that those are scheduled as hinted elections
for (auto const & block : blocks2)
{
// Non-final vote, so it stays in the AEC without getting confirmed
auto vote = nano::test::make_vote (nano::dev::genesis_key, { block });
node.vote_cache.insert (vote);
}
// Ensure hinted election amount is bounded by hinted limit
ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::hinted));
// And it stays that way without increasing
ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::hinted));
// Insert the first part of the blocks into normal election scheduler
for (auto const & block : blocks1)
{
node.scheduler.priority.activate (node.ledger.tx_begin_read (), block->account ());
}
// Ensure number of active elections reaches AEC limit and there is no overfill
ASSERT_TIMELY_EQ (5s, node.active.size (), node.active.limit (nano::election_behavior::priority));
// And it stays that way without increasing
ASSERT_ALWAYS (1s, node.active.size () == node.active.limit (nano::election_behavior::priority));
}

View file

@ -13,14 +13,115 @@
using namespace std::chrono_literals;
TEST (election_scheduler, construction)
namespace
{
nano::test::system system{ 1 };
nano::keypair & keyzero ()
{
static nano::keypair result;
return result;
}
nano::keypair & key0 ()
{
static nano::keypair result;
return result;
}
nano::keypair & key1 ()
{
static nano::keypair result;
return result;
}
nano::keypair & key2 ()
{
static nano::keypair result;
return result;
}
nano::keypair & key3 ()
{
static nano::keypair result;
return result;
}
std::shared_ptr<nano::state_block> & blockzero ()
{
nano::block_builder builder;
static auto result = builder
.state ()
.account (keyzero ().pub)
.previous (0)
.representative (keyzero ().pub)
.balance (0)
.link (0)
.sign (keyzero ().prv, keyzero ().pub)
.work (0)
.build ();
return result;
}
std::shared_ptr<nano::state_block> & block0 ()
{
nano::block_builder builder;
static auto result = builder
.state ()
.account (key0 ().pub)
.previous (0)
.representative (key0 ().pub)
.balance (nano::Gxrb_ratio)
.link (0)
.sign (key0 ().prv, key0 ().pub)
.work (0)
.build ();
return result;
}
std::shared_ptr<nano::state_block> & block1 ()
{
nano::block_builder builder;
static auto result = builder
.state ()
.account (key1 ().pub)
.previous (0)
.representative (key1 ().pub)
.balance (nano::Mxrb_ratio)
.link (0)
.sign (key1 ().prv, key1 ().pub)
.work (0)
.build ();
return result;
}
std::shared_ptr<nano::state_block> & block2 ()
{
nano::block_builder builder;
static auto result = builder
.state ()
.account (key2 ().pub)
.previous (0)
.representative (key2 ().pub)
.balance (nano::Gxrb_ratio)
.link (0)
.sign (key2 ().prv, key2 ().pub)
.work (0)
.build ();
return result;
}
std::shared_ptr<nano::state_block> & block3 ()
{
nano::block_builder builder;
static auto result = builder
.state ()
.account (key3 ().pub)
.previous (0)
.representative (key3 ().pub)
.balance (nano::Mxrb_ratio)
.link (0)
.sign (key3 ().prv, key3 ().pub)
.work (0)
.build ();
return result;
}
}
TEST (election_scheduler, activate_one_timely)
{
nano::test::system system{ 1 };
nano::test::system system;
auto & node = *system.add_node ();
nano::state_block_builder builder;
auto send1 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
@ -31,14 +132,16 @@ TEST (election_scheduler, activate_one_timely)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
system.nodes[0]->ledger.process (system.nodes[0]->ledger.tx_begin_write (), send1);
system.nodes[0]->scheduler.priority.activate (system.nodes[0]->ledger.tx_begin_read (), nano::dev::genesis_key.pub);
ASSERT_TIMELY (5s, system.nodes[0]->active.election (send1->qualified_root ()));
node.ledger.process (node.ledger.tx_begin_write (), send1);
node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub);
ASSERT_TIMELY (5s, node.active.election (send1->qualified_root ()));
}
TEST (election_scheduler, activate_one_flush)
{
nano::test::system system{ 1 };
nano::test::system system;
auto & node = *system.add_node ();
nano::state_block_builder builder;
auto send1 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
@ -49,9 +152,9 @@ TEST (election_scheduler, activate_one_flush)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
system.nodes[0]->ledger.process (system.nodes[0]->ledger.tx_begin_write (), send1);
system.nodes[0]->scheduler.priority.activate (system.nodes[0]->ledger.tx_begin_read (), nano::dev::genesis_key.pub);
ASSERT_TIMELY (5s, system.nodes[0]->active.election (send1->qualified_root ()));
node.ledger.process (node.ledger.tx_begin_write (), send1);
node.scheduler.priority.activate (node.ledger.tx_begin_read (), nano::dev::genesis_key.pub);
ASSERT_TIMELY (5s, node.active.election (send1->qualified_root ()));
}
/**
@ -71,13 +174,13 @@ TEST (election_scheduler, activate_one_flush)
*/
TEST (election_scheduler, no_vacancy)
{
nano::test::system system{};
nano::test::system system;
nano::node_config config = system.default_config ();
config.active_elections.size = 1;
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node = *system.add_node (config);
nano::state_block_builder builder{};
nano::keypair key{};
@ -146,3 +249,85 @@ TEST (election_scheduler, no_vacancy)
ASSERT_TIMELY (5s, node.active.election (block2->qualified_root ()) != nullptr);
ASSERT_TRUE (node.scheduler.priority.empty ());
}
TEST (election_scheduler_bucket, construction)
{
nano::test::system system;
auto & node = *system.add_node ();
nano::scheduler::priority_bucket_config bucket_config;
nano::scheduler::bucket bucket{ nano::Gxrb_ratio, bucket_config, node.active, node.stats };
ASSERT_EQ (nano::Gxrb_ratio, bucket.minimum_balance);
ASSERT_TRUE (bucket.empty ());
ASSERT_EQ (0, bucket.size ());
}
TEST (election_scheduler_bucket, insert_one)
{
nano::test::system system;
auto & node = *system.add_node ();
nano::scheduler::priority_bucket_config bucket_config;
nano::scheduler::bucket bucket{ 0, bucket_config, node.active, node.stats };
ASSERT_TRUE (bucket.push (1000, block0 ()));
ASSERT_FALSE (bucket.empty ());
ASSERT_EQ (1, bucket.size ());
auto blocks = bucket.blocks ();
ASSERT_EQ (1, blocks.size ());
ASSERT_EQ (block0 (), blocks.front ());
}
TEST (election_scheduler_bucket, insert_duplicate)
{
nano::test::system system;
auto & node = *system.add_node ();
nano::scheduler::priority_bucket_config bucket_config;
nano::scheduler::bucket bucket{ 0, bucket_config, node.active, node.stats };
ASSERT_TRUE (bucket.push (1000, block0 ()));
ASSERT_FALSE (bucket.push (1000, block0 ()));
}
TEST (election_scheduler_bucket, insert_many)
{
nano::test::system system;
auto & node = *system.add_node ();
nano::scheduler::priority_bucket_config bucket_config;
nano::scheduler::bucket bucket{ 0, bucket_config, node.active, node.stats };
ASSERT_TRUE (bucket.push (2000, block0 ()));
ASSERT_TRUE (bucket.push (1001, block1 ()));
ASSERT_TRUE (bucket.push (1000, block2 ()));
ASSERT_TRUE (bucket.push (900, block3 ()));
ASSERT_FALSE (bucket.empty ());
ASSERT_EQ (4, bucket.size ());
auto blocks = bucket.blocks ();
ASSERT_EQ (4, blocks.size ());
// Ensure correct order
ASSERT_EQ (blocks[0], block3 ());
ASSERT_EQ (blocks[1], block2 ());
ASSERT_EQ (blocks[2], block1 ());
ASSERT_EQ (blocks[3], block0 ());
}
TEST (election_scheduler_bucket, max_blocks)
{
nano::test::system system;
auto & node = *system.add_node ();
nano::scheduler::priority_bucket_config bucket_config{
.max_blocks = 2
};
nano::scheduler::bucket bucket{ 0, bucket_config, node.active, node.stats };
ASSERT_TRUE (bucket.push (2000, block0 ()));
ASSERT_TRUE (bucket.push (900, block1 ()));
ASSERT_FALSE (bucket.push (3000, block2 ()));
ASSERT_TRUE (bucket.push (1001, block3 ())); // Evicts 2000
ASSERT_TRUE (bucket.push (1000, block0 ())); // Evicts 1001
ASSERT_EQ (2, bucket.size ());
auto blocks = bucket.blocks ();
ASSERT_EQ (2, blocks.size ());
// Ensure correct order
ASSERT_EQ (blocks[0], block1 ());
ASSERT_EQ (blocks[1], block0 ());
}

View file

@ -3773,7 +3773,9 @@ TEST (node, local_block_broadcast)
// Disable active elections to prevent the block from being broadcasted by the election
auto node_config = system.default_config ();
node_config.active_elections.size = 0;
node_config.priority_scheduler.enabled = false;
node_config.hinted_scheduler.enabled = false;
node_config.optimistic_scheduler.enabled = false;
node_config.local_block_broadcaster.rebroadcast_interval = 1s;
auto & node1 = *system.add_node (node_config);
auto & node2 = *system.make_disconnected_node ();

View file

@ -1,5 +1,4 @@
#include <nano/lib/blocks.hpp>
#include <nano/node/scheduler/buckets.hpp>
#include <nano/secure/common.hpp>
#include <gtest/gtest.h>
@ -107,6 +106,7 @@ std::shared_ptr<nano::state_block> & block3 ()
return result;
}
/*
TEST (buckets, construction)
{
nano::scheduler::buckets buckets;
@ -240,3 +240,4 @@ TEST (buckets, trim_even)
buckets.pop ();
ASSERT_EQ (block1 (), buckets.top ());
}
*/

View file

@ -27,6 +27,7 @@ enum class type
vote_processor_tier,
vote_processor_overfill,
election,
election_cleanup,
election_vote,
http_callback,
ipc,
@ -68,10 +69,12 @@ enum class type
active_elections_confirmed,
active_elections_dropped,
active_elections_timeout,
active_elections_cancelled,
active_elections_cemented,
backlog,
unchecked,
election_scheduler,
election_bucket,
optimistic_scheduler,
handshake,
rep_crawler,
@ -388,6 +391,7 @@ enum class detail
// active
insert,
insert_failed,
election_cleanup,
// active_elections
started,
@ -472,6 +476,7 @@ enum class detail
active,
expired_confirmed,
expired_unconfirmed,
cancelled,
// election_status_type
ongoing,
@ -479,6 +484,10 @@ enum class detail
active_confirmation_height,
inactive_confirmation_height,
// election bucket
activate_success,
cancel_lowest,
_last // Must be the last enum
};

View file

@ -145,8 +145,6 @@ add_library(
request_aggregator.cpp
scheduler/bucket.cpp
scheduler/bucket.hpp
scheduler/buckets.cpp
scheduler/buckets.hpp
scheduler/component.hpp
scheduler/component.cpp
scheduler/hinted.hpp

View file

@ -308,7 +308,11 @@ void nano::active_elections::cleanup_election (nano::unique_lock<nano::mutex> &
auto blocks_l = election->blocks ();
node.vote_router.disconnect (*election);
roots.get<tag_root> ().erase (roots.get<tag_root> ().find (election->qualified_root));
// Erase root info
auto it = roots.get<tag_root> ().find (election->qualified_root);
release_assert (it != roots.get<tag_root> ().end ());
entry entry = *it;
roots.get<tag_root> ().erase (it);
node.stats.inc (nano::stat::type::active_elections, nano::stat::detail::stopped);
node.stats.inc (nano::stat::type::active_elections, election->confirmed () ? nano::stat::detail::confirmed : nano::stat::detail::unconfirmed);
@ -327,6 +331,11 @@ void nano::active_elections::cleanup_election (nano::unique_lock<nano::mutex> &
// Track election duration
node.stats.sample (nano::stat::sample::active_election_duration, election->duration ().count (), { 0, 1000 * 60 * 10 /* 0-10 minutes range */ });
// Notify observers without holding the lock
if (entry.erased_callback)
{
entry.erased_callback (election);
}
vacancy_update ();
for (auto const & [hash, block] : blocks_l)
@ -387,7 +396,7 @@ void nano::active_elections::request_loop ()
}
}
nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<nano::block> const & block_a, nano::election_behavior election_behavior_a)
nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<nano::block> const & block_a, nano::election_behavior election_behavior_a, erased_callback_t erased_callback_a)
{
debug_assert (block_a);
debug_assert (block_a->has_sideband ());
@ -414,7 +423,7 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<
node.online_reps.observe (rep_a);
};
result.election = nano::make_shared<nano::election> (node, block_a, nullptr, observe_rep_cb, election_behavior_a);
roots.get<tag_root> ().emplace (nano::active_elections::conflict_info{ root, result.election });
roots.get<tag_root> ().emplace (entry{ root, result.election, std::move (erased_callback_a) });
node.vote_router.connect (hash, result.election);
// Keep track of election count by election type
@ -556,10 +565,12 @@ std::size_t nano::active_elections::election_winner_details_size ()
void nano::active_elections::clear ()
{
// TODO: Call erased_callback for each election
{
nano::lock_guard<nano::mutex> guard{ mutex };
roots.clear ();
}
vacancy_update ();
}
@ -629,16 +640,14 @@ nano::stat::type nano::to_stat_type (nano::election_state state)
case election_state::expired_unconfirmed:
return nano::stat::type::active_elections_timeout;
break;
case election_state::cancelled:
return nano::stat::type::active_elections_cancelled;
break;
}
debug_assert (false);
return {};
}
nano::stat::detail nano::to_stat_detail (nano::election_state state)
{
return nano::enum_util::cast<nano::stat::detail> (state);
}
nano::stat::detail nano::to_stat_detail (nano::election_status_type type)
{
return nano::enum_util::cast<nano::stat::detail> (type);

View file

@ -72,12 +72,16 @@ public:
*/
class active_elections final
{
public:
using erased_callback_t = std::function<void (std::shared_ptr<nano::election>)>;
private: // Elections
class conflict_info final
class entry final
{
public:
nano::qualified_root root;
std::shared_ptr<nano::election> election;
erased_callback_t erased_callback;
};
friend class nano::election;
@ -90,11 +94,11 @@ private: // Elections
class tag_arrival {};
class tag_hash {};
using ordered_roots = boost::multi_index_container<conflict_info,
using ordered_roots = boost::multi_index_container<entry,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::hashed_unique<mi::tag<tag_root>,
mi::member<conflict_info, nano::qualified_root, &conflict_info::root>>
mi::member<entry, nano::qualified_root, &entry::root>>
>>;
// clang-format on
ordered_roots roots;
@ -109,7 +113,7 @@ public:
/**
* Starts new election with a specified behavior type
*/
nano::election_insertion_result insert (std::shared_ptr<nano::block> const &, nano::election_behavior = nano::election_behavior::priority);
nano::election_insertion_result insert (std::shared_ptr<nano::block> const &, nano::election_behavior = nano::election_behavior::priority, erased_callback_t = nullptr);
// Is the root of this block in the roots container
bool active (nano::block const &) const;
bool active (nano::qualified_root const &) const;

View file

@ -80,7 +80,6 @@ void nano::election::confirm_once (nano::unique_lock<nano::mutex> & lock_a)
bool nano::election::valid_change (nano::election_state expected_a, nano::election_state desired_a) const
{
bool result = false;
switch (expected_a)
{
case nano::election_state::passive:
@ -89,8 +88,8 @@ bool nano::election::valid_change (nano::election_state expected_a, nano::electi
case nano::election_state::active:
case nano::election_state::confirmed:
case nano::election_state::expired_unconfirmed:
result = true;
break;
case nano::election_state::cancelled:
return true; // Valid
default:
break;
}
@ -100,8 +99,8 @@ bool nano::election::valid_change (nano::election_state expected_a, nano::electi
{
case nano::election_state::confirmed:
case nano::election_state::expired_unconfirmed:
result = true;
break;
case nano::election_state::cancelled:
return true; // Valid
default:
break;
}
@ -110,17 +109,18 @@ bool nano::election::valid_change (nano::election_state expected_a, nano::electi
switch (desired_a)
{
case nano::election_state::expired_confirmed:
result = true;
break;
return true; // Valid
default:
break;
}
break;
case nano::election_state::expired_unconfirmed:
case nano::election_state::expired_confirmed:
case nano::election_state::cancelled:
// No transitions are valid from these states
break;
}
return result;
return false;
}
bool nano::election::state_change (nano::election_state expected_a, nano::election_state desired_a)
@ -167,10 +167,16 @@ void nano::election::send_confirm_req (nano::confirmation_solicitor & solicitor_
void nano::election::transition_active ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
nano::lock_guard<nano::mutex> guard{ mutex };
state_change (nano::election_state::passive, nano::election_state::active);
}
void nano::election::cancel ()
{
nano::lock_guard<nano::mutex> guard{ mutex };
state_change (state_m, nano::election_state::cancelled);
}
bool nano::election::confirmed_locked () const
{
debug_assert (!mutex.try_lock ());
@ -272,6 +278,8 @@ bool nano::election::transition_time (nano::confirmation_solicitor & solicitor_a
case nano::election_state::expired_confirmed:
debug_assert (false);
break;
case nano::election_state::cancelled:
return true; // Clean up cancelled elections immediately
}
if (!confirmed_locked () && time_to_live () < std::chrono::steady_clock::now () - election_start)
@ -821,3 +829,8 @@ std::string_view nano::to_string (nano::election_state state)
{
return nano::enum_util::name (state);
}
nano::stat::detail nano::to_stat_detail (nano::election_state state)
{
return nano::enum_util::cast<nano::stat::detail> (state);
}

View file

@ -49,10 +49,12 @@ enum class election_state
active, // actively request confirmations
confirmed, // confirmed but still listening for votes
expired_confirmed,
expired_unconfirmed
expired_unconfirmed,
cancelled,
};
std::string_view to_string (election_state);
nano::stat::detail to_stat_detail (election_state);
class election final : public std::enable_shared_from_this<election>
{
@ -84,6 +86,7 @@ private: // State management
public: // State transitions
bool transition_time (nano::confirmation_solicitor &);
void transition_active ();
void cancel ();
public: // Status
bool confirmed () const;

View file

@ -209,6 +209,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
optimistic_scheduler.serialize (optimistic_l);
toml.put_child ("optimistic_scheduler", optimistic_l);
nano::tomlconfig priority_bucket_l;
priority_bucket.serialize (priority_bucket_l);
toml.put_child ("priority_bucket", priority_bucket_l);
nano::tomlconfig bootstrap_ascending_l;
bootstrap_ascending.serialize (bootstrap_ascending_l);
toml.put_child ("bootstrap_ascending", bootstrap_ascending_l);
@ -310,6 +314,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
hinted_scheduler.deserialize (config_l);
}
if (toml.has_key ("priority_bucket"))
{
auto config_l = toml.get_required_child ("priority_bucket");
priority_bucket.deserialize (config_l);
}
if (toml.has_key ("bootstrap_ascending"))
{
auto config_l = toml.get_required_child ("bootstrap_ascending");

View file

@ -21,6 +21,7 @@
#include <nano/node/peer_history.hpp>
#include <nano/node/repcrawler.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/scheduler/bucket.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/scheduler/priority.hpp>
@ -71,6 +72,7 @@ public:
nano::scheduler::optimistic_config optimistic_scheduler;
nano::scheduler::hinted_config hinted_scheduler;
nano::scheduler::priority_config priority_scheduler;
nano::scheduler::priority_bucket_config priority_bucket;
std::vector<std::pair<std::string, uint16_t>> work_peers;
std::vector<std::pair<std::string, uint16_t>> secondary_work_peers{ { "127.0.0.1", 8076 } }; /* Default of nano-pow-server */
std::vector<std::string> preconfigured_peers;

View file

@ -1,46 +1,135 @@
#include <nano/lib/blocks.hpp>
#include <nano/node/active_elections.hpp>
#include <nano/node/election.hpp>
#include <nano/node/node.hpp>
#include <nano/node/scheduler/bucket.hpp>
bool nano::scheduler::bucket::value_type::operator< (value_type const & other_a) const
{
return time < other_a.time || (time == other_a.time && block->hash () < other_a.block->hash ());
}
/*
* bucket
*/
bool nano::scheduler::bucket::value_type::operator== (value_type const & other_a) const
nano::scheduler::bucket::bucket (nano::uint128_t minimum_balance_a, priority_bucket_config const & config_a, nano::active_elections & active_a, nano::stats & stats_a) :
config{ config_a },
minimum_balance{ minimum_balance_a },
active{ active_a },
stats{ stats_a }
{
return time == other_a.time && block->hash () == other_a.block->hash ();
}
nano::scheduler::bucket::bucket (nano::uint128_t minimum_balance, size_t maximum) :
maximum{ maximum },
minimum_balance{ minimum_balance }
{
debug_assert (maximum > 0);
}
nano::scheduler::bucket::~bucket ()
{
}
std::shared_ptr<nano::block> nano::scheduler::bucket::top () const
bool nano::scheduler::bucket::available () const
{
debug_assert (!queue.empty ());
return queue.begin ()->block;
nano::lock_guard<nano::mutex> lock{ mutex };
if (queue.empty ())
{
return false;
}
else
{
return election_vacancy (queue.begin ()->time);
}
}
void nano::scheduler::bucket::pop ()
bool nano::scheduler::bucket::election_vacancy (priority_t candidate) const
{
debug_assert (!queue.empty ());
debug_assert (!mutex.try_lock ());
if (elections.size () < config.reserved_elections)
{
return true;
}
if (elections.size () < config.max_elections)
{
return active.vacancy (nano::election_behavior::priority) > 0;
}
if (!elections.empty ())
{
auto lowest = elections.get<tag_priority> ().begin ()->priority;
// Compare to equal to drain duplicates
if (candidate <= lowest)
{
// Bound number of reprioritizations
return elections.size () < config.max_elections * 2;
};
}
return false;
}
bool nano::scheduler::bucket::election_overfill () const
{
debug_assert (!mutex.try_lock ());
if (elections.size () < config.reserved_elections)
{
return false;
}
if (elections.size () < config.max_elections)
{
return active.vacancy (nano::election_behavior::priority) < 0;
}
return true;
}
bool nano::scheduler::bucket::activate ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (queue.empty ())
{
return false; // Not activated
}
block_entry top = *queue.begin ();
queue.erase (queue.begin ());
auto block = top.block;
auto priority = top.time;
auto erase_callback = [this] (std::shared_ptr<nano::election> election) {
nano::lock_guard<nano::mutex> lock{ mutex };
elections.get<tag_root> ().erase (election->qualified_root);
};
auto result = active.insert (block, nano::election_behavior::priority, erase_callback);
if (result.inserted)
{
release_assert (result.election);
elections.get<tag_root> ().insert ({ result.election, result.election->qualified_root, priority });
stats.inc (nano::stat::type::election_bucket, nano::stat::detail::activate_success);
}
else
{
stats.inc (nano::stat::type::election_bucket, nano::stat::detail::activate_failed);
}
return result.inserted;
}
void nano::scheduler::bucket::update ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (election_overfill ())
{
cancel_lowest_election ();
}
}
// Returns true if the block was inserted
bool nano::scheduler::bucket::push (uint64_t time, std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto [it, inserted] = queue.insert ({ time, block });
release_assert (!queue.empty ());
bool was_last = (it == --queue.end ());
if (queue.size () > maximum)
if (queue.size () > config.max_blocks)
{
queue.erase (--queue.end ());
return inserted && !was_last;
@ -50,14 +139,46 @@ bool nano::scheduler::bucket::push (uint64_t time, std::shared_ptr<nano::block>
size_t nano::scheduler::bucket::size () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return queue.size ();
}
bool nano::scheduler::bucket::empty () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return queue.empty ();
}
size_t nano::scheduler::bucket::election_count () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return elections.size ();
}
void nano::scheduler::bucket::cancel_lowest_election ()
{
debug_assert (!mutex.try_lock ());
if (!elections.empty ())
{
elections.get<tag_priority> ().begin ()->election->cancel ();
stats.inc (nano::stat::type::election_bucket, nano::stat::detail::cancel_lowest);
}
}
std::deque<std::shared_ptr<nano::block>> nano::scheduler::bucket::blocks () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
std::deque<std::shared_ptr<nano::block>> result;
for (auto const & item : queue)
{
result.push_back (item.block);
}
return result;
}
void nano::scheduler::bucket::dump () const
{
for (auto const & item : queue)
@ -65,3 +186,39 @@ void nano::scheduler::bucket::dump () const
std::cerr << item.time << ' ' << item.block->hash ().to_string () << '\n';
}
}
/*
* block_entry
*/
bool nano::scheduler::bucket::block_entry::operator< (block_entry const & other_a) const
{
return time < other_a.time || (time == other_a.time && block->hash () < other_a.block->hash ());
}
bool nano::scheduler::bucket::block_entry::operator== (block_entry const & other_a) const
{
return time == other_a.time && block->hash () == other_a.block->hash ();
}
/*
* priority_bucket_config
*/
nano::error nano::scheduler::priority_bucket_config::serialize (nano::tomlconfig & toml) const
{
toml.put ("max_blocks", max_blocks, "Maximum number of blocks to sort by priority per bucket. \nType: uint64");
toml.put ("reserved_elections", reserved_elections, "Number of guaranteed slots per bucket available for election activation. \nType: uint64");
toml.put ("max_elections", max_elections, "Maximum number of slots per bucket available for election activation if the active election count is below the configured limit. \nType: uint64");
return toml.get_error ();
}
nano::error nano::scheduler::priority_bucket_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("max_blocks", max_blocks);
toml.get ("reserved_elections", reserved_elections);
toml.get ("max_elections", max_elections);
return toml.get_error ();
}

View file

@ -1,42 +1,124 @@
#pragma once
#include <nano/node/fwd.hpp>
#include <nano/secure/common.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <set>
namespace mi = boost::multi_index;
namespace nano
{
class election;
class active_elections;
class block;
}
namespace nano::scheduler
{
/** A class which holds an ordered set of blocks to be scheduled, ordered by their block arrival time
class priority_bucket_config final
{
public:
nano::error deserialize (nano::tomlconfig & toml);
nano::error serialize (nano::tomlconfig & toml) const;
public:
// Maximum number of blocks to sort by priority per bucket.
std::size_t max_blocks{ 1024 * 8 };
// Number of guaranteed slots per bucket available for election activation.
std::size_t reserved_elections{ 100 };
// Maximum number of slots per bucket available for election activation if the active election count is below the configured limit. (node.active_elections.size)
std::size_t max_elections{ 150 };
};
/**
* A class which holds an ordered set of blocks to be scheduled, ordered by their block arrival time
* TODO: This combines both block ordering and election management, which makes the class harder to test. The functionality should be split.
*/
class bucket final
{
class value_type
{
public:
uint64_t time;
std::shared_ptr<nano::block> block;
bool operator< (value_type const & other_a) const;
bool operator== (value_type const & other_a) const;
};
std::set<value_type> queue;
size_t const maximum;
public:
using priority_t = uint64_t;
public:
bucket (nano::uint128_t minimum_balance, size_t maximum);
bucket (nano::uint128_t minimum_balance, priority_bucket_config const &, nano::active_elections &, nano::stats &);
~bucket ();
nano::uint128_t const minimum_balance;
std::shared_ptr<nano::block> top () const;
void pop ();
bool available () const;
bool activate ();
void update ();
bool push (uint64_t time, std::shared_ptr<nano::block> block);
size_t size () const;
size_t election_count () const;
bool empty () const;
std::deque<std::shared_ptr<nano::block>> blocks () const;
void dump () const;
private:
bool election_vacancy (priority_t candidate) const;
bool election_overfill () const;
void cancel_lowest_election ();
private: // Dependencies
priority_bucket_config const & config;
nano::active_elections & active;
nano::stats & stats;
private: // Blocks
struct block_entry
{
uint64_t time;
std::shared_ptr<nano::block> block;
bool operator< (block_entry const & other_a) const;
bool operator== (block_entry const & other_a) const;
};
std::set<block_entry> queue;
private: // Elections
struct election_entry
{
std::shared_ptr<nano::election> election;
nano::qualified_root root;
priority_t priority;
};
// clang-format off
class tag_sequenced {};
class tag_root {};
class tag_priority {};
using ordered_elections = boost::multi_index_container<election_entry,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::hashed_unique<mi::tag<tag_root>,
mi::member<election_entry, nano::qualified_root, &election_entry::root>>,
mi::ordered_non_unique<mi::tag<tag_priority>,
mi::member<election_entry, priority_t, &election_entry::priority>>
>>;
// clang-format on
ordered_elections elections;
private:
mutable nano::mutex mutex;
};
} // namespace nano::scheduler

View file

@ -1,159 +0,0 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/scheduler/bucket.hpp>
#include <nano/node/scheduler/buckets.hpp>
#include <string>
/** Moves the bucket pointer to the next bucket */
void nano::scheduler::buckets::next ()
{
++current;
if (current == buckets_m.end ())
{
current = buckets_m.begin ();
}
}
/** Seek to the next non-empty bucket, if one exists */
void nano::scheduler::buckets::seek ()
{
next ();
for (std::size_t i = 0, n = buckets_m.size (); (*current)->empty () && i < n; ++i)
{
next ();
}
}
void nano::scheduler::buckets::setup_buckets (uint64_t maximum)
{
auto const size_expected = 63;
auto bucket_max = std::max<size_t> (1u, maximum / size_expected);
auto build_region = [&] (uint128_t const & begin, uint128_t const & end, size_t count) {
auto width = (end - begin) / count;
for (auto i = 0; i < count; ++i)
{
buckets_m.push_back (std::make_unique<scheduler::bucket> (begin + i * width, bucket_max));
}
};
build_region (0, uint128_t{ 1 } << 79, 1);
build_region (uint128_t{ 1 } << 79, uint128_t{ 1 } << 88, 1);
build_region (uint128_t{ 1 } << 88, uint128_t{ 1 } << 92, 2);
build_region (uint128_t{ 1 } << 92, uint128_t{ 1 } << 96, 4);
build_region (uint128_t{ 1 } << 96, uint128_t{ 1 } << 100, 8);
build_region (uint128_t{ 1 } << 100, uint128_t{ 1 } << 104, 16);
build_region (uint128_t{ 1 } << 104, uint128_t{ 1 } << 108, 16);
build_region (uint128_t{ 1 } << 108, uint128_t{ 1 } << 112, 8);
build_region (uint128_t{ 1 } << 112, uint128_t{ 1 } << 116, 4);
build_region (uint128_t{ 1 } << 116, uint128_t{ 1 } << 120, 2);
build_region (uint128_t{ 1 } << 120, uint128_t{ 1 } << 127, 1);
debug_assert (buckets_m.size () == size_expected);
}
/**
* Prioritization constructor, construct a container containing approximately 'maximum' number of blocks.
* @param maximum number of blocks that this container can hold, this is a soft and approximate limit.
*/
nano::scheduler::buckets::buckets (uint64_t maximum) :
maximum{ maximum }
{
setup_buckets (maximum);
current = buckets_m.begin ();
}
nano::scheduler::buckets::~buckets ()
{
}
/**
* Push a block and its associated time into the prioritization container.
* The time is given here because sideband might not exist in the case of state blocks.
*/
bool nano::scheduler::buckets::push (uint64_t time, std::shared_ptr<nano::block> block, nano::amount const & priority)
{
auto was_empty = empty ();
auto & bucket = find_bucket (priority.number ());
bool added = bucket.push (time, block);
if (was_empty)
{
seek ();
}
return added;
}
/** Return the highest priority block of the current bucket */
std::shared_ptr<nano::block> nano::scheduler::buckets::top () const
{
debug_assert (!empty ());
auto result = (*current)->top ();
return result;
}
/** Pop the current block from the container and seek to the next block, if it exists */
void nano::scheduler::buckets::pop ()
{
debug_assert (!empty ());
auto & bucket = *current;
bucket->pop ();
seek ();
}
/** Returns the total number of blocks in buckets */
std::size_t nano::scheduler::buckets::size () const
{
std::size_t result{ 0 };
for (auto const & bucket : buckets_m)
{
result += bucket->size ();
}
return result;
}
/** Returns number of buckets, 62 by default */
std::size_t nano::scheduler::buckets::bucket_count () const
{
return buckets_m.size ();
}
/** Returns number of items in bucket with index 'index' */
std::size_t nano::scheduler::buckets::bucket_size (std::size_t index) const
{
return buckets_m[index]->size ();
}
/** Returns true if all buckets are empty */
bool nano::scheduler::buckets::empty () const
{
return std::all_of (buckets_m.begin (), buckets_m.end (), [] (auto const & bucket) { return bucket->empty (); });
}
/** Print the state of the class in stderr */
void nano::scheduler::buckets::dump () const
{
for (auto const & bucket : buckets_m)
{
bucket->dump ();
}
std::cerr << "current: " << current - buckets_m.begin () << '\n';
}
auto nano::scheduler::buckets::find_bucket (nano::uint128_t priority) -> bucket &
{
auto it = std::upper_bound (buckets_m.begin (), buckets_m.end (), priority, [] (nano::uint128_t const & priority, std::unique_ptr<bucket> const & bucket) {
return priority < bucket->minimum_balance;
});
release_assert (it != buckets_m.begin ()); // There should always be a bucket with a minimum_balance of 0
return **std::prev (it);
}
std::unique_ptr<nano::container_info_component> nano::scheduler::buckets::collect_container_info (std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
for (auto i = 0; i < buckets_m.size (); ++i)
{
auto const & bucket = buckets_m[i];
composite->add_component (std::make_unique<container_info_leaf> (container_info{ std::to_string (i), bucket->size (), 0 }));
}
return composite;
}

View file

@ -1,58 +0,0 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <nano/lib/utility.hpp>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
namespace nano
{
class block;
}
namespace nano::scheduler
{
class bucket;
/** A container for holding blocks and their arrival/creation time.
*
* The container consists of a number of buckets. Each bucket holds an ordered set of 'value_type' items.
* The buckets are accessed in a round robin fashion. The index 'current' holds the index of the bucket to access next.
* When a block is inserted, the bucket to go into is determined by the account balance and the priority inside that
* bucket is determined by its creation/arrival time.
*
* The arrival/creation time is only an approximation and it could even be wildly wrong,
* for example, in the event of bootstrapped blocks.
*/
class buckets final
{
/** container for the buckets to be read in round robin fashion */
std::vector<std::unique_ptr<bucket>> buckets_m;
/** index of bucket to read next */
decltype (buckets_m)::const_iterator current;
/** maximum number of blocks in whole container, each bucket's maximum is maximum / bucket_number */
uint64_t const maximum;
void next ();
void seek ();
void setup_buckets (uint64_t maximum);
public:
buckets (uint64_t maximum = 250000u);
~buckets ();
// Returns true if the block was inserted
bool push (uint64_t time, std::shared_ptr<nano::block> block, nano::amount const & priority);
std::shared_ptr<nano::block> top () const;
void pop ();
std::size_t size () const;
std::size_t bucket_count () const;
std::size_t bucket_size (std::size_t index) const;
bool empty () const;
void dump () const;
bucket & find_bucket (nano::uint128_t priority);
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const &);
};
} // namespace nano::scheduler

View file

@ -2,7 +2,6 @@
#include <nano/node/active_elections.hpp>
#include <nano/node/election.hpp>
#include <nano/node/node.hpp>
#include <nano/node/scheduler/buckets.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
@ -11,20 +10,49 @@
nano::scheduler::priority::priority (nano::node & node_a, nano::stats & stats_a) :
config{ node_a.config.priority_scheduler },
node{ node_a },
stats{ stats_a },
buckets{ std::make_unique<scheduler::buckets> () }
stats{ stats_a }
{
std::vector<nano::uint128_t> minimums;
auto build_region = [&minimums] (uint128_t const & begin, uint128_t const & end, size_t count) {
auto width = (end - begin) / count;
for (auto i = 0; i < count; ++i)
{
minimums.push_back (begin + i * width);
}
};
minimums.push_back (uint128_t{ 0 });
build_region (uint128_t{ 1 } << 88, uint128_t{ 1 } << 92, 2);
build_region (uint128_t{ 1 } << 92, uint128_t{ 1 } << 96, 4);
build_region (uint128_t{ 1 } << 96, uint128_t{ 1 } << 100, 8);
build_region (uint128_t{ 1 } << 100, uint128_t{ 1 } << 104, 16);
build_region (uint128_t{ 1 } << 104, uint128_t{ 1 } << 108, 16);
build_region (uint128_t{ 1 } << 108, uint128_t{ 1 } << 112, 8);
build_region (uint128_t{ 1 } << 112, uint128_t{ 1 } << 116, 4);
build_region (uint128_t{ 1 } << 116, uint128_t{ 1 } << 120, 2);
minimums.push_back (uint128_t{ 1 } << 120);
node.logger.info (nano::log::type::election_scheduler, "Number of buckets: {}", minimums.size ());
for (size_t i = 0u, n = minimums.size (); i < n; ++i)
{
auto bucket = std::make_unique<scheduler::bucket> (minimums[i], node.config.priority_bucket, node.active, stats);
buckets.emplace_back (std::move (bucket));
}
}
nano::scheduler::priority::~priority ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!cleanup_thread.joinable ());
}
void nano::scheduler::priority::start ()
{
debug_assert (!thread.joinable ());
debug_assert (!cleanup_thread.joinable ());
if (!config.enabled)
{
@ -35,6 +63,11 @@ void nano::scheduler::priority::start ()
nano::thread_role::set (nano::thread_role::name::scheduler_priority);
run ();
} };
cleanup_thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::scheduler_priority);
run_cleanup ();
} };
}
void nano::scheduler::priority::stop ()
@ -43,8 +76,9 @@ void nano::scheduler::priority::stop ()
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
notify ();
nano::join_or_pass (thread);
condition.notify_all ();
join_or_pass (thread);
join_or_pass (cleanup_thread);
}
bool nano::scheduler::priority::activate (secure::transaction const & transaction, nano::account const & account)
@ -80,8 +114,8 @@ bool nano::scheduler::priority::activate (secure::transaction const & transactio
bool added = false;
{
nano::lock_guard<nano::mutex> lock{ mutex };
added = buckets->push (account_info.modified, block, balance_priority);
auto & bucket = find_bucket (balance_priority);
added = bucket.push (account_info.modified, block);
}
if (added)
{
@ -113,24 +147,23 @@ void nano::scheduler::priority::notify ()
std::size_t nano::scheduler::priority::size () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return buckets->size ();
}
bool nano::scheduler::priority::empty_locked () const
{
return buckets->empty ();
return std::accumulate (buckets.begin (), buckets.end (), std::size_t{ 0 }, [] (auto const & sum, auto const & bucket) {
return sum + bucket->size ();
});
}
bool nano::scheduler::priority::empty () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return empty_locked ();
return std::all_of (buckets.begin (), buckets.end (), [] (auto const & bucket) {
return bucket->empty ();
});
}
bool nano::scheduler::priority::predicate () const
{
return node.active.vacancy (nano::election_behavior::priority) > 0 && !buckets->empty ();
return std::any_of (buckets.begin (), buckets.end (), [] (auto const & bucket) {
return bucket->available ();
});
}
void nano::scheduler::priority::run ()
@ -146,37 +179,79 @@ void nano::scheduler::priority::run ()
{
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop);
if (predicate ())
lock.unlock ();
for (auto & bucket : buckets)
{
auto block = buckets->top ();
buckets->pop ();
lock.unlock ();
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority);
auto result = node.active.insert (block);
if (result.inserted)
if (bucket->available ())
{
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority_success);
}
if (result.election != nullptr)
{
result.election->transition_active ();
bucket->activate ();
}
}
else
{
lock.unlock ();
}
notify ();
lock.lock ();
}
}
}
std::unique_ptr<nano::container_info_component> nano::scheduler::priority::collect_container_info (std::string const & name)
void nano::scheduler::priority::run_cleanup ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, 1s, [this] () {
return stopped;
});
if (!stopped)
{
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::cleanup);
lock.unlock ();
for (auto & bucket : buckets)
{
bucket->update ();
}
lock.lock ();
}
}
}
auto nano::scheduler::priority::find_bucket (nano::uint128_t priority) -> bucket &
{
auto it = std::upper_bound (buckets.begin (), buckets.end (), priority, [] (nano::uint128_t const & priority, std::unique_ptr<bucket> const & bucket) {
return priority < bucket->minimum_balance;
});
release_assert (it != buckets.begin ()); // There should always be a bucket with a minimum_balance of 0
it = std::prev (it);
return **it;
}
std::unique_ptr<nano::container_info_component> nano::scheduler::priority::collect_container_info (std::string const & name) const
{
auto collect_blocks = [&] () {
auto composite = std::make_unique<container_info_composite> ("blocks");
for (auto i = 0; i < buckets.size (); ++i)
{
auto const & bucket = buckets[i];
composite->add_component (std::make_unique<container_info_leaf> (container_info{ std::to_string (i), bucket->size (), 0 }));
}
return composite;
};
auto collect_elections = [&] () {
auto composite = std::make_unique<container_info_composite> ("elections");
for (auto i = 0; i < buckets.size (); ++i)
{
auto const & bucket = buckets[i];
composite->add_component (std::make_unique<container_info_leaf> (container_info{ std::to_string (i), bucket->election_count (), 0 }));
}
return composite;
};
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (buckets->collect_container_info ("buckets"));
composite->add_component (collect_blocks ());
composite->add_component (collect_elections ());
return composite;
}

View file

@ -1,8 +1,7 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <boost/optional.hpp>
#include <nano/node/scheduler/bucket.hpp>
#include <condition_variable>
#include <deque>
@ -56,7 +55,7 @@ public:
std::size_t size () const;
bool empty () const;
std::unique_ptr<container_info_component> collect_container_info (std::string const & name);
std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const;
private: // Dependencies
priority_config const & config;
@ -65,14 +64,17 @@ private: // Dependencies
private:
void run ();
bool empty_locked () const;
void run_cleanup ();
bool predicate () const;
bucket & find_bucket (nano::uint128_t priority);
std::unique_ptr<nano::scheduler::buckets> buckets;
private:
std::vector<std::unique_ptr<bucket>> buckets;
bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
std::thread cleanup_thread;
};
}