Splitting manual scheduler in to its own file.

This changes the order in which elections are scheduled. Both manual and prioritized elections are scheduled in parallel. Previously, manually scheduled elections would always be started before general prioritized elections.
This commit is contained in:
Colin LeMahieu 2023-09-12 18:35:12 +01:00
commit b10ab55e69
No known key found for this signature in database
GPG key ID: 43708520C8DFB938
13 changed files with 177 additions and 50 deletions

View file

@ -1,6 +1,7 @@
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/election.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/test_common/chains.hpp>
@ -1393,11 +1394,11 @@ TEST (active_transactions, fifo)
ASSERT_EQ (nano::process_result::progress, node.process (*receive2).code);
// Ensure first transaction becomes active
node.scheduler.priority.manual (receive1);
node.scheduler.manual.push (receive1);
ASSERT_TIMELY (5s, node.active.election (receive1->qualified_root ()) != nullptr);
// Ensure second transaction becomes active
node.scheduler.priority.manual (receive2);
node.scheduler.manual.push (receive2);
ASSERT_TIMELY (5s, node.active.election (receive2->qualified_root ()) != nullptr);
// Ensure excess transactions get trimmed

View file

@ -1,6 +1,7 @@
#include <nano/lib/config.hpp>
#include <nano/node/election.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/fake.hpp>
#include <nano/node/transport/inproc.hpp>
@ -987,7 +988,7 @@ TEST (node, fork_open_flip)
// give block open1 to node1, manually trigger an election for open1 and ensure it is in the ledger
node1.process_active (open1);
ASSERT_TIMELY (5s, node1.block (open1->hash ()) != nullptr);
node1.scheduler.priority.manual (open1);
node1.scheduler.manual.push (open1);
ASSERT_TIMELY (5s, (election = node1.active.election (open1->qualified_root ())) != nullptr);
election->transition_active ();
@ -1000,7 +1001,7 @@ TEST (node, fork_open_flip)
// ensure open2 is in node2 ledger (and therefore has sideband) and manually trigger an election for open2
ASSERT_TIMELY (5s, node2.block (open2->hash ()) != nullptr);
node2.scheduler.priority.manual (open2);
node2.scheduler.manual.push (open2);
ASSERT_TIMELY (5s, (election = node2.active.election (open2->qualified_root ())) != nullptr);
election->transition_active ();

View file

@ -202,6 +202,8 @@ add_library(
scheduler/component.cpp
scheduler/hinted.hpp
scheduler/hinted.cpp
scheduler/manual.hpp
scheduler/manual.cpp
scheduler/optimistic.hpp
scheduler/optimistic.cpp
scheduler/priority.hpp

View file

@ -7,6 +7,7 @@
#include <nano/node/rocksdb/rocksdb.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/telemetry.hpp>
@ -1261,7 +1262,7 @@ void nano::node::add_initial_peers ()
void nano::node::start_election (std::shared_ptr<nano::block> const & block)
{
scheduler.priority.manual (block);
scheduler.manual.push (block);
}
bool nano::node::block_confirmed (nano::block_hash const & hash_a)

View file

@ -1,14 +1,17 @@
#include <nano/node/node.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/scheduler/priority.hpp>
nano::scheduler::component::component (nano::node & node) :
hinted_impl{ std::make_unique<nano::scheduler::hinted> (nano::scheduler::hinted::config{ node.config }, node, node.inactive_vote_cache, node.active, node.online_reps, node.stats) },
manual_impl{ std::make_unique<nano::scheduler::manual> (node) },
optimistic_impl{ std::make_unique<nano::scheduler::optimistic> (node.config.optimistic_scheduler, node, node.ledger, node.active, node.network_params.network, node.stats) },
priority_impl{ std::make_unique<nano::scheduler::priority> (node, node.stats) },
hinted{ *hinted_impl },
manual{ *manual_impl },
optimistic{ *optimistic_impl },
priority{ *priority_impl }
{
@ -21,6 +24,7 @@ nano::scheduler::component::~component ()
void nano::scheduler::component::start ()
{
hinted.start ();
manual.start ();
optimistic.start ();
priority.start ();
}
@ -28,6 +32,7 @@ void nano::scheduler::component::start ()
void nano::scheduler::component::stop ()
{
hinted.stop ();
manual.stop ();
optimistic.stop ();
priority.stop ();
}
@ -38,6 +43,7 @@ std::unique_ptr<nano::container_info_component> nano::scheduler::component::coll
auto composite = std::make_unique<container_info_composite> (name);
//composite->add_component (hinted.collect_container_info ("hinted"));
composite->add_component (manual.collect_container_info ("manual"));
//composite->add_component (optimistic.collect_container_info ("optimistic"));
composite->add_component (priority.collect_container_info ("priority"));
return composite;

View file

@ -13,12 +13,14 @@ class node;
namespace nano::scheduler
{
class hinted;
class manual;
class optimistic;
class priority;
class component
class component final
{
std::unique_ptr<nano::scheduler::hinted> hinted_impl;
std::unique_ptr<nano::scheduler::manual> manual_impl;
std::unique_ptr<nano::scheduler::optimistic> optimistic_impl;
std::unique_ptr<nano::scheduler::priority> priority_impl;
nano::mutex mutex;
@ -35,6 +37,7 @@ public:
std::unique_ptr<container_info_component> collect_container_info (std::string const & name);
nano::scheduler::hinted & hinted;
nano::scheduler::manual & manual;
nano::scheduler::optimistic & optimistic;
nano::scheduler::priority & priority;
};

View file

@ -0,0 +1,94 @@
#include <nano/node/node.hpp>
#include <nano/node/scheduler/manual.hpp>
nano::scheduler::manual::manual (nano::node & node) :
node{ node }
{
}
nano::scheduler::manual::~manual ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
}
void nano::scheduler::manual::start ()
{
debug_assert (!thread.joinable ());
thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::election_scheduler);
run ();
} };
}
void nano::scheduler::manual::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
notify ();
nano::join_or_pass (thread);
}
void nano::scheduler::manual::notify ()
{
condition.notify_all ();
}
void nano::scheduler::manual::push (std::shared_ptr<nano::block> const & block_a, boost::optional<nano::uint128_t> const & previous_balance_a, nano::election_behavior election_behavior_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a));
notify ();
}
bool nano::scheduler::manual::predicate () const
{
return !queue.empty ();
}
void nano::scheduler::manual::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait (lock, [this] () {
return stopped || predicate ();
});
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
if (!stopped)
{
node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop);
if (predicate ())
{
auto const [block, previous_balance, election_behavior] = queue.front ();
queue.pop_front ();
lock.unlock ();
node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual);
auto result = node.active.insert (block, election_behavior);
if (result.election != nullptr)
{
result.election->transition_active ();
}
}
else
{
lock.unlock ();
}
notify ();
lock.lock ();
}
}
}
std::unique_ptr<nano::container_info_component> nano::scheduler::manual::collect_container_info (std::string const & name)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queue", queue.size (), sizeof (decltype (queue)::value_type) }));
return composite;
}

View file

@ -0,0 +1,46 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/node/active_transactions.hpp>
#include <boost/optional.hpp>
#include <deque>
#include <memory>
#include <mutex>
namespace nano
{
class block;
class node;
}
namespace nano::scheduler
{
class buckets;
class manual final
{
std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior>> queue;
nano::node & node;
nano::mutex mutex;
nano::condition_variable condition;
bool stopped{ false };
std::thread thread;
void notify ();
bool predicate () const;
void run ();
public:
manual (nano::node & node);
~manual ();
void start ();
void stop ();
// Manualy start an election for a block
// Call action with confirmed block, may be different than what we started with
void push (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal);
std::unique_ptr<container_info_component> collect_container_info (std::string const & name);
}; // class manual
} // nano::scheduler

View file

@ -35,13 +35,6 @@ void nano::scheduler::priority::stop ()
nano::join_or_pass (thread);
}
void nano::scheduler::priority::manual (std::shared_ptr<nano::block> const & block_a, boost::optional<nano::uint128_t> const & previous_balance_a, nano::election_behavior election_behavior_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
manual_queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a));
notify ();
}
bool nano::scheduler::priority::activate (nano::account const & account_a, nano::transaction const & transaction)
{
debug_assert (!account_a.is_zero ());
@ -79,12 +72,12 @@ void nano::scheduler::priority::notify ()
std::size_t nano::scheduler::priority::size () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return buckets->size () + manual_queue.size ();
return buckets->size ();
}
bool nano::scheduler::priority::empty_locked () const
{
return buckets->empty () && manual_queue.empty ();
return buckets->empty ();
}
bool nano::scheduler::priority::empty () const
@ -93,42 +86,25 @@ bool nano::scheduler::priority::empty () const
return empty_locked ();
}
bool nano::scheduler::priority::priority_queue_predicate () const
bool nano::scheduler::priority::predicate () const
{
return node.active.vacancy () > 0 && !buckets->empty ();
}
bool nano::scheduler::priority::manual_queue_predicate () const
{
return !manual_queue.empty ();
}
void nano::scheduler::priority::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait (lock, [this] () {
return stopped || priority_queue_predicate () || manual_queue_predicate ();
return stopped || predicate ();
});
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
if (!stopped)
{
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop);
if (manual_queue_predicate ())
{
auto const [block, previous_balance, election_behavior] = manual_queue.front ();
manual_queue.pop_front ();
lock.unlock ();
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual);
auto result = node.active.insert (block, election_behavior);
if (result.election != nullptr)
{
result.election->transition_active ();
}
}
else if (priority_queue_predicate ())
if (predicate ())
{
auto block = buckets->top ();
buckets->pop ();
@ -159,7 +135,6 @@ std::unique_ptr<nano::container_info_component> nano::scheduler::priority::colle
nano::unique_lock<nano::mutex> lock{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "manual_queue", manual_queue.size (), sizeof (decltype (manual_queue)::value_type) }));
composite->add_component (buckets->collect_container_info ("buckets"));
return composite;
}

View file

@ -32,9 +32,6 @@ public:
priority (nano::node &, nano::stats &);
~priority ();
// Manualy start an election for a block
// Call action with confirmed block, may be different than what we started with
void manual (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal);
/**
* Activates the first unconfirmed block of \p account_a
* @return true if account was activated
@ -51,12 +48,10 @@ private: // Dependencies
private:
void run ();
bool empty_locked () const;
bool priority_queue_predicate () const;
bool manual_queue_predicate () const;
bool predicate () const;
std::unique_ptr<nano::scheduler::buckets> buckets;
std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior>> manual_queue;
bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;

View file

@ -6,6 +6,7 @@
#include <nano/node/json_handler.hpp>
#include <nano/node/node_rpc_config.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/rpc/rpc.hpp>
#include <nano/rpc/rpc_request_processor.hpp>
@ -1556,7 +1557,7 @@ TEST (rpc, process_subtype_open)
ASSERT_EQ (nano::process_result::progress, node1->process (*send).code);
ASSERT_EQ (nano::process_result::progress, node2.process (*send).code);
auto const rpc_ctx = add_rpc (system, node1);
node1->scheduler.priority.manual (send);
node1->scheduler.manual.push (send);
auto open = builder
.state ()
.account (key.pub)
@ -1605,7 +1606,7 @@ TEST (rpc, process_subtype_receive)
ASSERT_EQ (nano::process_result::progress, node1->process (*send).code);
ASSERT_EQ (nano::process_result::progress, node2.process (*send).code);
auto const rpc_ctx = add_rpc (system, node1);
node1->scheduler.priority.manual (send);
node1->scheduler.manual.push (send);
auto receive = builder
.state ()
.account (nano::dev::genesis_key.pub)

View file

@ -2,6 +2,7 @@
#include <nano/lib/threading.hpp>
#include <nano/node/election.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/unchecked_map.hpp>
@ -677,7 +678,7 @@ TEST (confirmation_height, many_accounts_single_confirmation)
{
auto block = node->block (last_open_hash);
ASSERT_NE (nullptr, block);
node->scheduler.priority.manual (block);
node->scheduler.manual.push (block);
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (10s, (election = node->active.election (block->qualified_root ())) != nullptr);
election->force_confirm ();
@ -760,7 +761,7 @@ TEST (confirmation_height, many_accounts_many_confirmations)
// Confirm all of the accounts
for (auto & open_block : open_blocks)
{
node->scheduler.priority.manual (open_block);
node->scheduler.manual.push (open_block);
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (10s, (election = node->active.election (open_block->qualified_root ())) != nullptr);
election->force_confirm ();
@ -900,7 +901,7 @@ TEST (confirmation_height, long_chains)
// Call block confirm on the existing receive block on the genesis account which will confirm everything underneath on both accounts
{
node->scheduler.priority.manual (receive1);
node->scheduler.manual.push (receive1);
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (10s, (election = node->active.election (receive1->qualified_root ())) != nullptr);
election->force_confirm ();
@ -2225,7 +2226,7 @@ TEST (node, wallet_create_block_confirm_conflicts)
// Call block confirm on the top level send block which will confirm everything underneath on both accounts.
{
auto block = node->store.block.get (node->store.tx_begin_read (), latest);
node->scheduler.priority.manual (block);
node->scheduler.manual.push (block);
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (10s, (election = node->active.election (block->qualified_root ())) != nullptr);
election->force_confirm ();

View file

@ -1,5 +1,6 @@
#include <nano/crypto_lib/random_pool.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/node/transport/fake.hpp>
#include <nano/test_common/system.hpp>
@ -124,7 +125,7 @@ bool nano::test::activate (nano::node & node, std::vector<nano::block_hash> hash
// Block does not exist in the ledger yet
return false;
}
node.scheduler.priority.manual (disk_block);
node.scheduler.manual.push (disk_block);
}
return true;
}
@ -205,7 +206,7 @@ std::shared_ptr<nano::election> nano::test::start_election (nano::test::system &
block_l = node_a.block (hash_a);
}
node_a.scheduler.priority.manual (block_l);
node_a.scheduler.manual.push (block_l);
// wait for the election to appear
std::shared_ptr<nano::election> election = node_a.active.election (block_l->qualified_root ());