diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 9842f8bf..066fa04e 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -1393,11 +1394,11 @@ TEST (active_transactions, fifo) ASSERT_EQ (nano::process_result::progress, node.process (*receive2).code); // Ensure first transaction becomes active - node.scheduler.priority.manual (receive1); + node.scheduler.manual.push (receive1); ASSERT_TIMELY (5s, node.active.election (receive1->qualified_root ()) != nullptr); // Ensure second transaction becomes active - node.scheduler.priority.manual (receive2); + node.scheduler.manual.push (receive2); ASSERT_TIMELY (5s, node.active.election (receive2->qualified_root ()) != nullptr); // Ensure excess transactions get trimmed diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 7b8468bf..ab4408a5 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -987,7 +988,7 @@ TEST (node, fork_open_flip) // give block open1 to node1, manually trigger an election for open1 and ensure it is in the ledger node1.process_active (open1); ASSERT_TIMELY (5s, node1.block (open1->hash ()) != nullptr); - node1.scheduler.priority.manual (open1); + node1.scheduler.manual.push (open1); ASSERT_TIMELY (5s, (election = node1.active.election (open1->qualified_root ())) != nullptr); election->transition_active (); @@ -1000,7 +1001,7 @@ TEST (node, fork_open_flip) // ensure open2 is in node2 ledger (and therefore has sideband) and manually trigger an election for open2 ASSERT_TIMELY (5s, node2.block (open2->hash ()) != nullptr); - node2.scheduler.priority.manual (open2); + node2.scheduler.manual.push (open2); ASSERT_TIMELY (5s, (election = node2.active.election (open2->qualified_root ())) != nullptr); election->transition_active (); diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index ccb4d4fe..4e1f29cc 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -202,6 +202,8 @@ add_library( scheduler/component.cpp scheduler/hinted.hpp scheduler/hinted.cpp + scheduler/manual.hpp + scheduler/manual.cpp scheduler/optimistic.hpp scheduler/optimistic.cpp scheduler/priority.hpp diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 95e99719..59180367 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -1261,7 +1262,7 @@ void nano::node::add_initial_peers () void nano::node::start_election (std::shared_ptr const & block) { - scheduler.priority.manual (block); + scheduler.manual.push (block); } bool nano::node::block_confirmed (nano::block_hash const & hash_a) diff --git a/nano/node/scheduler/component.cpp b/nano/node/scheduler/component.cpp index 33fa7dc1..f1462f38 100644 --- a/nano/node/scheduler/component.cpp +++ b/nano/node/scheduler/component.cpp @@ -1,14 +1,17 @@ #include #include #include +#include #include #include nano::scheduler::component::component (nano::node & node) : hinted_impl{ std::make_unique (nano::scheduler::hinted::config{ node.config }, node, node.inactive_vote_cache, node.active, node.online_reps, node.stats) }, + manual_impl{ std::make_unique (node) }, optimistic_impl{ std::make_unique (node.config.optimistic_scheduler, node, node.ledger, node.active, node.network_params.network, node.stats) }, priority_impl{ std::make_unique (node, node.stats) }, hinted{ *hinted_impl }, + manual{ *manual_impl }, optimistic{ *optimistic_impl }, priority{ *priority_impl } { @@ -21,6 +24,7 @@ nano::scheduler::component::~component () void nano::scheduler::component::start () { hinted.start (); + manual.start (); optimistic.start (); priority.start (); } @@ -28,6 +32,7 @@ void nano::scheduler::component::start () void nano::scheduler::component::stop () { hinted.stop (); + manual.stop (); optimistic.stop (); priority.stop (); } @@ -38,6 +43,7 @@ std::unique_ptr nano::scheduler::component::coll auto composite = std::make_unique (name); //composite->add_component (hinted.collect_container_info ("hinted")); + composite->add_component (manual.collect_container_info ("manual")); //composite->add_component (optimistic.collect_container_info ("optimistic")); composite->add_component (priority.collect_container_info ("priority")); return composite; diff --git a/nano/node/scheduler/component.hpp b/nano/node/scheduler/component.hpp index 7768fba3..25790243 100644 --- a/nano/node/scheduler/component.hpp +++ b/nano/node/scheduler/component.hpp @@ -13,12 +13,14 @@ class node; namespace nano::scheduler { class hinted; +class manual; class optimistic; class priority; -class component +class component final { std::unique_ptr hinted_impl; + std::unique_ptr manual_impl; std::unique_ptr optimistic_impl; std::unique_ptr priority_impl; nano::mutex mutex; @@ -35,6 +37,7 @@ public: std::unique_ptr collect_container_info (std::string const & name); nano::scheduler::hinted & hinted; + nano::scheduler::manual & manual; nano::scheduler::optimistic & optimistic; nano::scheduler::priority & priority; }; diff --git a/nano/node/scheduler/manual.cpp b/nano/node/scheduler/manual.cpp new file mode 100644 index 00000000..82ce43de --- /dev/null +++ b/nano/node/scheduler/manual.cpp @@ -0,0 +1,94 @@ +#include +#include + +nano::scheduler::manual::manual (nano::node & node) : + node{ node } +{ +} + +nano::scheduler::manual::~manual () +{ + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); +} + +void nano::scheduler::manual::start () +{ + debug_assert (!thread.joinable ()); + + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::election_scheduler); + run (); + } }; +} + +void nano::scheduler::manual::stop () +{ + { + nano::lock_guard lock{ mutex }; + stopped = true; + } + notify (); + nano::join_or_pass (thread); +} + +void nano::scheduler::manual::notify () +{ + condition.notify_all (); +} + +void nano::scheduler::manual::push (std::shared_ptr const & block_a, boost::optional const & previous_balance_a, nano::election_behavior election_behavior_a) +{ + nano::lock_guard lock{ mutex }; + queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a)); + notify (); +} + +bool nano::scheduler::manual::predicate () const +{ + return !queue.empty (); +} + +void nano::scheduler::manual::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + condition.wait (lock, [this] () { + return stopped || predicate (); + }); + debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds + if (!stopped) + { + node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop); + + if (predicate ()) + { + auto const [block, previous_balance, election_behavior] = queue.front (); + queue.pop_front (); + lock.unlock (); + node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual); + auto result = node.active.insert (block, election_behavior); + if (result.election != nullptr) + { + result.election->transition_active (); + } + } + else + { + lock.unlock (); + } + notify (); + lock.lock (); + } + } +} + +std::unique_ptr nano::scheduler::manual::collect_container_info (std::string const & name) +{ + nano::unique_lock lock{ mutex }; + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "queue", queue.size (), sizeof (decltype (queue)::value_type) })); + return composite; +} diff --git a/nano/node/scheduler/manual.hpp b/nano/node/scheduler/manual.hpp new file mode 100644 index 00000000..3edfa1dc --- /dev/null +++ b/nano/node/scheduler/manual.hpp @@ -0,0 +1,46 @@ +#pragma once +#include +#include +#include + +#include + +#include +#include +#include + +namespace nano +{ +class block; +class node; +} + +namespace nano::scheduler +{ +class buckets; +class manual final +{ + std::deque, boost::optional, nano::election_behavior>> queue; + nano::node & node; + nano::mutex mutex; + nano::condition_variable condition; + bool stopped{ false }; + std::thread thread; + void notify (); + bool predicate () const; + void run (); + +public: + manual (nano::node & node); + ~manual (); + + void start (); + void stop (); + + // Manualy start an election for a block + // Call action with confirmed block, may be different than what we started with + void push (std::shared_ptr const &, boost::optional const & = boost::none, nano::election_behavior = nano::election_behavior::normal); + + std::unique_ptr collect_container_info (std::string const & name); +}; // class manual +} // nano::scheduler diff --git a/nano/node/scheduler/priority.cpp b/nano/node/scheduler/priority.cpp index c8e9e0f2..323a18c5 100644 --- a/nano/node/scheduler/priority.cpp +++ b/nano/node/scheduler/priority.cpp @@ -35,13 +35,6 @@ void nano::scheduler::priority::stop () nano::join_or_pass (thread); } -void nano::scheduler::priority::manual (std::shared_ptr const & block_a, boost::optional const & previous_balance_a, nano::election_behavior election_behavior_a) -{ - nano::lock_guard lock{ mutex }; - manual_queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a)); - notify (); -} - bool nano::scheduler::priority::activate (nano::account const & account_a, nano::transaction const & transaction) { debug_assert (!account_a.is_zero ()); @@ -79,12 +72,12 @@ void nano::scheduler::priority::notify () std::size_t nano::scheduler::priority::size () const { nano::lock_guard lock{ mutex }; - return buckets->size () + manual_queue.size (); + return buckets->size (); } bool nano::scheduler::priority::empty_locked () const { - return buckets->empty () && manual_queue.empty (); + return buckets->empty (); } bool nano::scheduler::priority::empty () const @@ -93,42 +86,25 @@ bool nano::scheduler::priority::empty () const return empty_locked (); } -bool nano::scheduler::priority::priority_queue_predicate () const +bool nano::scheduler::priority::predicate () const { return node.active.vacancy () > 0 && !buckets->empty (); } -bool nano::scheduler::priority::manual_queue_predicate () const -{ - return !manual_queue.empty (); -} - void nano::scheduler::priority::run () { nano::unique_lock lock{ mutex }; while (!stopped) { condition.wait (lock, [this] () { - return stopped || priority_queue_predicate () || manual_queue_predicate (); + return stopped || predicate (); }); debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds if (!stopped) { stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop); - if (manual_queue_predicate ()) - { - auto const [block, previous_balance, election_behavior] = manual_queue.front (); - manual_queue.pop_front (); - lock.unlock (); - stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual); - auto result = node.active.insert (block, election_behavior); - if (result.election != nullptr) - { - result.election->transition_active (); - } - } - else if (priority_queue_predicate ()) + if (predicate ()) { auto block = buckets->top (); buckets->pop (); @@ -159,7 +135,6 @@ std::unique_ptr nano::scheduler::priority::colle nano::unique_lock lock{ mutex }; auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "manual_queue", manual_queue.size (), sizeof (decltype (manual_queue)::value_type) })); composite->add_component (buckets->collect_container_info ("buckets")); return composite; } diff --git a/nano/node/scheduler/priority.hpp b/nano/node/scheduler/priority.hpp index b798f7a4..13b32b95 100644 --- a/nano/node/scheduler/priority.hpp +++ b/nano/node/scheduler/priority.hpp @@ -32,9 +32,6 @@ public: priority (nano::node &, nano::stats &); ~priority (); - // Manualy start an election for a block - // Call action with confirmed block, may be different than what we started with - void manual (std::shared_ptr const &, boost::optional const & = boost::none, nano::election_behavior = nano::election_behavior::normal); /** * Activates the first unconfirmed block of \p account_a * @return true if account was activated @@ -51,12 +48,10 @@ private: // Dependencies private: void run (); bool empty_locked () const; - bool priority_queue_predicate () const; - bool manual_queue_predicate () const; + bool predicate () const; std::unique_ptr buckets; - std::deque, boost::optional, nano::election_behavior>> manual_queue; bool stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex; diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 8468ffb7..a48fe545 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -1556,7 +1557,7 @@ TEST (rpc, process_subtype_open) ASSERT_EQ (nano::process_result::progress, node1->process (*send).code); ASSERT_EQ (nano::process_result::progress, node2.process (*send).code); auto const rpc_ctx = add_rpc (system, node1); - node1->scheduler.priority.manual (send); + node1->scheduler.manual.push (send); auto open = builder .state () .account (key.pub) @@ -1605,7 +1606,7 @@ TEST (rpc, process_subtype_receive) ASSERT_EQ (nano::process_result::progress, node1->process (*send).code); ASSERT_EQ (nano::process_result::progress, node2.process (*send).code); auto const rpc_ctx = add_rpc (system, node1); - node1->scheduler.priority.manual (send); + node1->scheduler.manual.push (send); auto receive = builder .state () .account (nano::dev::genesis_key.pub) diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index c9dd6c09..d63b2ac1 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -677,7 +678,7 @@ TEST (confirmation_height, many_accounts_single_confirmation) { auto block = node->block (last_open_hash); ASSERT_NE (nullptr, block); - node->scheduler.priority.manual (block); + node->scheduler.manual.push (block); std::shared_ptr election; ASSERT_TIMELY (10s, (election = node->active.election (block->qualified_root ())) != nullptr); election->force_confirm (); @@ -760,7 +761,7 @@ TEST (confirmation_height, many_accounts_many_confirmations) // Confirm all of the accounts for (auto & open_block : open_blocks) { - node->scheduler.priority.manual (open_block); + node->scheduler.manual.push (open_block); std::shared_ptr election; ASSERT_TIMELY (10s, (election = node->active.election (open_block->qualified_root ())) != nullptr); election->force_confirm (); @@ -900,7 +901,7 @@ TEST (confirmation_height, long_chains) // Call block confirm on the existing receive block on the genesis account which will confirm everything underneath on both accounts { - node->scheduler.priority.manual (receive1); + node->scheduler.manual.push (receive1); std::shared_ptr election; ASSERT_TIMELY (10s, (election = node->active.election (receive1->qualified_root ())) != nullptr); election->force_confirm (); @@ -2225,7 +2226,7 @@ TEST (node, wallet_create_block_confirm_conflicts) // Call block confirm on the top level send block which will confirm everything underneath on both accounts. { auto block = node->store.block.get (node->store.tx_begin_read (), latest); - node->scheduler.priority.manual (block); + node->scheduler.manual.push (block); std::shared_ptr election; ASSERT_TIMELY (10s, (election = node->active.election (block->qualified_root ())) != nullptr); election->force_confirm (); diff --git a/nano/test_common/testutil.cpp b/nano/test_common/testutil.cpp index f3aba3d3..05a6db68 100644 --- a/nano/test_common/testutil.cpp +++ b/nano/test_common/testutil.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -124,7 +125,7 @@ bool nano::test::activate (nano::node & node, std::vector hash // Block does not exist in the ledger yet return false; } - node.scheduler.priority.manual (disk_block); + node.scheduler.manual.push (disk_block); } return true; } @@ -205,7 +206,7 @@ std::shared_ptr nano::test::start_election (nano::test::system & block_l = node_a.block (hash_a); } - node_a.scheduler.priority.manual (block_l); + node_a.scheduler.manual.push (block_l); // wait for the election to appear std::shared_ptr election = node_a.active.election (block_l->qualified_root ());