Merge pull request #4281 from clemahieu/manual_scheduler
Separate manual scheduler in to its own class
This commit is contained in:
commit
9bbbc089f5
19 changed files with 262 additions and 95 deletions
|
@ -1,6 +1,7 @@
|
|||
#include <nano/lib/jsonconfig.hpp>
|
||||
#include <nano/node/election.hpp>
|
||||
#include <nano/node/scheduler/component.hpp>
|
||||
#include <nano/node/scheduler/manual.hpp>
|
||||
#include <nano/node/scheduler/priority.hpp>
|
||||
#include <nano/node/transport/inproc.hpp>
|
||||
#include <nano/test_common/chains.hpp>
|
||||
|
@ -1393,11 +1394,11 @@ TEST (active_transactions, fifo)
|
|||
ASSERT_EQ (nano::process_result::progress, node.process (*receive2).code);
|
||||
|
||||
// Ensure first transaction becomes active
|
||||
node.scheduler.priority.manual (receive1);
|
||||
node.scheduler.manual.push (receive1);
|
||||
ASSERT_TIMELY (5s, node.active.election (receive1->qualified_root ()) != nullptr);
|
||||
|
||||
// Ensure second transaction becomes active
|
||||
node.scheduler.priority.manual (receive2);
|
||||
node.scheduler.manual.push (receive2);
|
||||
ASSERT_TIMELY (5s, node.active.election (receive2->qualified_root ()) != nullptr);
|
||||
|
||||
// Ensure excess transactions get trimmed
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#include <nano/lib/config.hpp>
|
||||
#include <nano/node/election.hpp>
|
||||
#include <nano/node/scheduler/component.hpp>
|
||||
#include <nano/node/scheduler/manual.hpp>
|
||||
#include <nano/node/scheduler/priority.hpp>
|
||||
#include <nano/node/transport/fake.hpp>
|
||||
#include <nano/node/transport/inproc.hpp>
|
||||
|
@ -987,7 +988,7 @@ TEST (node, fork_open_flip)
|
|||
// give block open1 to node1, manually trigger an election for open1 and ensure it is in the ledger
|
||||
node1.process_active (open1);
|
||||
ASSERT_TIMELY (5s, node1.block (open1->hash ()) != nullptr);
|
||||
node1.scheduler.priority.manual (open1);
|
||||
node1.scheduler.manual.push (open1);
|
||||
ASSERT_TIMELY (5s, (election = node1.active.election (open1->qualified_root ())) != nullptr);
|
||||
election->transition_active ();
|
||||
|
||||
|
@ -1000,7 +1001,7 @@ TEST (node, fork_open_flip)
|
|||
|
||||
// ensure open2 is in node2 ledger (and therefore has sideband) and manually trigger an election for open2
|
||||
ASSERT_TIMELY (5s, node2.block (open2->hash ()) != nullptr);
|
||||
node2.scheduler.priority.manual (open2);
|
||||
node2.scheduler.manual.push (open2);
|
||||
ASSERT_TIMELY (5s, (election = node2.active.election (open2->qualified_root ())) != nullptr);
|
||||
election->transition_active ();
|
||||
|
||||
|
|
|
@ -70,18 +70,12 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
|
|||
case nano::thread_role::name::db_parallel_traversal:
|
||||
thread_role_name_string = "DB par traversl";
|
||||
break;
|
||||
case nano::thread_role::name::election_scheduler:
|
||||
thread_role_name_string = "Election Sched";
|
||||
break;
|
||||
case nano::thread_role::name::unchecked:
|
||||
thread_role_name_string = "Unchecked";
|
||||
break;
|
||||
case nano::thread_role::name::backlog_population:
|
||||
thread_role_name_string = "Backlog";
|
||||
break;
|
||||
case nano::thread_role::name::election_hinting:
|
||||
thread_role_name_string = "Hinting";
|
||||
break;
|
||||
case nano::thread_role::name::vote_generator_queue:
|
||||
thread_role_name_string = "Voting que";
|
||||
break;
|
||||
|
@ -94,8 +88,17 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
|
|||
case nano::thread_role::name::telemetry:
|
||||
thread_role_name_string = "Telemetry";
|
||||
break;
|
||||
case nano::thread_role::name::optimistic_scheduler:
|
||||
thread_role_name_string = "Optimistic";
|
||||
case nano::thread_role::name::scheduler_hinted:
|
||||
thread_role_name_string = "Sched Hinted";
|
||||
break;
|
||||
case nano::thread_role::name::scheduler_manual:
|
||||
thread_role_name_string = "Sched Manual";
|
||||
break;
|
||||
case nano::thread_role::name::scheduler_optimistic:
|
||||
thread_role_name_string = "Sched Opt";
|
||||
break;
|
||||
case nano::thread_role::name::scheduler_priority:
|
||||
thread_role_name_string = "Sched Priority";
|
||||
break;
|
||||
default:
|
||||
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
|
||||
|
@ -133,4 +136,4 @@ void nano::thread_role::set (nano::thread_role::name role)
|
|||
nano::thread_role::set_os_name (thread_role_name_string);
|
||||
|
||||
current_thread_role = role;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,17 +30,18 @@ enum class name
|
|||
state_block_signature_verification,
|
||||
epoch_upgrader,
|
||||
db_parallel_traversal,
|
||||
election_scheduler,
|
||||
unchecked,
|
||||
backlog_population,
|
||||
election_hinting,
|
||||
vote_generator_queue,
|
||||
bootstrap_server,
|
||||
telemetry,
|
||||
optimistic_scheduler,
|
||||
ascending_bootstrap,
|
||||
bootstrap_server_requests,
|
||||
bootstrap_server_responses,
|
||||
scheduler_hinted,
|
||||
scheduler_manual,
|
||||
scheduler_optimistic,
|
||||
scheduler_priority,
|
||||
};
|
||||
|
||||
/*
|
||||
|
@ -63,4 +64,4 @@ std::string get_string ();
|
|||
* Internal only, should not be called directly
|
||||
*/
|
||||
void set_os_name (std::string const &);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -202,6 +202,8 @@ add_library(
|
|||
scheduler/component.cpp
|
||||
scheduler/hinted.hpp
|
||||
scheduler/hinted.cpp
|
||||
scheduler/manual.hpp
|
||||
scheduler/manual.cpp
|
||||
scheduler/optimistic.hpp
|
||||
scheduler/optimistic.cpp
|
||||
scheduler/priority.hpp
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#include <nano/node/rocksdb/rocksdb.hpp>
|
||||
#include <nano/node/scheduler/component.hpp>
|
||||
#include <nano/node/scheduler/hinted.hpp>
|
||||
#include <nano/node/scheduler/manual.hpp>
|
||||
#include <nano/node/scheduler/optimistic.hpp>
|
||||
#include <nano/node/scheduler/priority.hpp>
|
||||
#include <nano/node/telemetry.hpp>
|
||||
|
@ -576,7 +577,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
|
|||
composite->add_component (collect_container_info (node.confirmation_height_processor, "confirmation_height_processor"));
|
||||
composite->add_component (collect_container_info (node.distributed_work, "distributed_work"));
|
||||
composite->add_component (collect_container_info (node.aggregator, "request_aggregator"));
|
||||
composite->add_component (node.scheduler.priority.collect_container_info ("priority_scheduler"));
|
||||
composite->add_component (node.scheduler.collect_container_info ("scheduler"));
|
||||
composite->add_component (node.inactive_vote_cache.collect_container_info ("inactive_vote_cache"));
|
||||
composite->add_component (collect_container_info (node.generator, "vote_generator"));
|
||||
composite->add_component (collect_container_info (node.final_generator, "vote_generator_final"));
|
||||
|
@ -688,10 +689,8 @@ void nano::node::start ()
|
|||
active.start ();
|
||||
generator.start ();
|
||||
final_generator.start ();
|
||||
scheduler.optimistic.start ();
|
||||
scheduler.priority.start ();
|
||||
scheduler.start ();
|
||||
backlog.start ();
|
||||
scheduler.hinted.start ();
|
||||
bootstrap_server.start ();
|
||||
if (!flags.disable_ascending_bootstrap)
|
||||
{
|
||||
|
@ -723,9 +722,7 @@ void nano::node::stop ()
|
|||
block_processor.stop ();
|
||||
aggregator.stop ();
|
||||
vote_processor.stop ();
|
||||
scheduler.priority.stop ();
|
||||
scheduler.optimistic.stop ();
|
||||
scheduler.hinted.stop ();
|
||||
scheduler.stop ();
|
||||
active.stop ();
|
||||
generator.stop ();
|
||||
final_generator.stop ();
|
||||
|
@ -1265,7 +1262,7 @@ void nano::node::add_initial_peers ()
|
|||
|
||||
void nano::node::start_election (std::shared_ptr<nano::block> const & block)
|
||||
{
|
||||
scheduler.priority.manual (block);
|
||||
scheduler.manual.push (block);
|
||||
}
|
||||
|
||||
bool nano::node::block_confirmed (nano::block_hash const & hash_a)
|
||||
|
|
|
@ -1,15 +1,50 @@
|
|||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/scheduler/component.hpp>
|
||||
#include <nano/node/scheduler/hinted.hpp>
|
||||
#include <nano/node/scheduler/manual.hpp>
|
||||
#include <nano/node/scheduler/optimistic.hpp>
|
||||
#include <nano/node/scheduler/priority.hpp>
|
||||
|
||||
nano::scheduler::component::component (nano::node & node) :
|
||||
hinted_impl{ std::make_unique<nano::scheduler::hinted> (nano::scheduler::hinted::config{ node.config }, node, node.inactive_vote_cache, node.active, node.online_reps, node.stats) },
|
||||
manual_impl{ std::make_unique<nano::scheduler::manual> (node) },
|
||||
optimistic_impl{ std::make_unique<nano::scheduler::optimistic> (node.config.optimistic_scheduler, node, node.ledger, node.active, node.network_params.network, node.stats) },
|
||||
priority_impl{ std::make_unique<nano::scheduler::priority> (node, node.stats) },
|
||||
hinted_impl{ std::make_unique<nano::scheduler::hinted> (nano::scheduler::hinted::config{ node.config }, node, node.inactive_vote_cache, node.active, node.online_reps, node.stats) },
|
||||
priority{ *priority_impl },
|
||||
hinted{ *hinted_impl },
|
||||
optimistic{ *optimistic_impl }
|
||||
manual{ *manual_impl },
|
||||
optimistic{ *optimistic_impl },
|
||||
priority{ *priority_impl }
|
||||
{
|
||||
}
|
||||
|
||||
nano::scheduler::component::~component ()
|
||||
{
|
||||
}
|
||||
|
||||
void nano::scheduler::component::start ()
|
||||
{
|
||||
hinted.start ();
|
||||
manual.start ();
|
||||
optimistic.start ();
|
||||
priority.start ();
|
||||
}
|
||||
|
||||
void nano::scheduler::component::stop ()
|
||||
{
|
||||
hinted.stop ();
|
||||
manual.stop ();
|
||||
optimistic.stop ();
|
||||
priority.stop ();
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::scheduler::component::collect_container_info (std::string const & name)
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
//composite->add_component (hinted.collect_container_info ("hinted"));
|
||||
composite->add_component (manual.collect_container_info ("manual"));
|
||||
//composite->add_component (optimistic.collect_container_info ("optimistic"));
|
||||
composite->add_component (priority.collect_container_info ("priority"));
|
||||
return composite;
|
||||
}
|
||||
|
|
|
@ -1,28 +1,44 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/lib/locks.hpp>
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class container_info_component;
|
||||
class node;
|
||||
}
|
||||
namespace nano::scheduler
|
||||
{
|
||||
class hinted;
|
||||
class manual;
|
||||
class optimistic;
|
||||
class priority;
|
||||
|
||||
class component
|
||||
class component final
|
||||
{
|
||||
std::unique_ptr<nano::scheduler::hinted> hinted_impl;
|
||||
std::unique_ptr<nano::scheduler::manual> manual_impl;
|
||||
std::unique_ptr<nano::scheduler::optimistic> optimistic_impl;
|
||||
std::unique_ptr<nano::scheduler::priority> priority_impl;
|
||||
std::unique_ptr<nano::scheduler::hinted> hinted_impl;
|
||||
nano::mutex mutex;
|
||||
|
||||
public:
|
||||
explicit component (nano::node & node);
|
||||
~component ();
|
||||
|
||||
// Starts all schedulers
|
||||
void start ();
|
||||
// Stops all schedulers
|
||||
void stop ();
|
||||
|
||||
std::unique_ptr<container_info_component> collect_container_info (std::string const & name);
|
||||
|
||||
nano::scheduler::priority & priority;
|
||||
nano::scheduler::hinted & hinted;
|
||||
nano::scheduler::manual & manual;
|
||||
nano::scheduler::optimistic & optimistic;
|
||||
nano::scheduler::priority & priority;
|
||||
};
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ void nano::scheduler::hinted::start ()
|
|||
debug_assert (!thread.joinable ());
|
||||
|
||||
thread = std::thread{ [this] () {
|
||||
nano::thread_role::set (nano::thread_role::name::election_hinting);
|
||||
nano::thread_role::set (nano::thread_role::name::scheduler_hinted);
|
||||
run ();
|
||||
} };
|
||||
}
|
||||
|
|
|
@ -22,6 +22,10 @@ namespace nano::scheduler
|
|||
*/
|
||||
class hinted final
|
||||
{
|
||||
friend class component;
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
public: // Config
|
||||
struct config final
|
||||
{
|
||||
|
@ -34,9 +38,6 @@ public:
|
|||
hinted (config const &, nano::node &, nano::vote_cache &, nano::active_transactions &, nano::online_reps &, nano::stats &);
|
||||
~hinted ();
|
||||
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
/*
|
||||
* Notify about changes in AEC vacancy
|
||||
*/
|
||||
|
|
94
nano/node/scheduler/manual.cpp
Normal file
94
nano/node/scheduler/manual.cpp
Normal file
|
@ -0,0 +1,94 @@
|
|||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/scheduler/manual.hpp>
|
||||
|
||||
nano::scheduler::manual::manual (nano::node & node) :
|
||||
node{ node }
|
||||
{
|
||||
}
|
||||
|
||||
nano::scheduler::manual::~manual ()
|
||||
{
|
||||
// Thread must be stopped before destruction
|
||||
debug_assert (!thread.joinable ());
|
||||
}
|
||||
|
||||
void nano::scheduler::manual::start ()
|
||||
{
|
||||
debug_assert (!thread.joinable ());
|
||||
|
||||
thread = std::thread{ [this] () {
|
||||
nano::thread_role::set (nano::thread_role::name::scheduler_manual);
|
||||
run ();
|
||||
} };
|
||||
}
|
||||
|
||||
void nano::scheduler::manual::stop ()
|
||||
{
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
stopped = true;
|
||||
}
|
||||
notify ();
|
||||
nano::join_or_pass (thread);
|
||||
}
|
||||
|
||||
void nano::scheduler::manual::notify ()
|
||||
{
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
void nano::scheduler::manual::push (std::shared_ptr<nano::block> const & block_a, boost::optional<nano::uint128_t> const & previous_balance_a, nano::election_behavior election_behavior_a)
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a));
|
||||
notify ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::manual::predicate () const
|
||||
{
|
||||
return !queue.empty ();
|
||||
}
|
||||
|
||||
void nano::scheduler::manual::run ()
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
while (!stopped)
|
||||
{
|
||||
condition.wait (lock, [this] () {
|
||||
return stopped || predicate ();
|
||||
});
|
||||
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
|
||||
if (!stopped)
|
||||
{
|
||||
node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop);
|
||||
|
||||
if (predicate ())
|
||||
{
|
||||
auto const [block, previous_balance, election_behavior] = queue.front ();
|
||||
queue.pop_front ();
|
||||
lock.unlock ();
|
||||
node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual);
|
||||
auto result = node.active.insert (block, election_behavior);
|
||||
if (result.election != nullptr)
|
||||
{
|
||||
result.election->transition_active ();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
lock.unlock ();
|
||||
}
|
||||
notify ();
|
||||
lock.lock ();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::scheduler::manual::collect_container_info (std::string const & name)
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queue", queue.size (), sizeof (decltype (queue)::value_type) }));
|
||||
return composite;
|
||||
}
|
46
nano/node/scheduler/manual.hpp
Normal file
46
nano/node/scheduler/manual.hpp
Normal file
|
@ -0,0 +1,46 @@
|
|||
#pragma once
|
||||
#include <nano/lib/locks.hpp>
|
||||
#include <nano/lib/numbers.hpp>
|
||||
#include <nano/node/active_transactions.hpp>
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class block;
|
||||
class node;
|
||||
}
|
||||
|
||||
namespace nano::scheduler
|
||||
{
|
||||
class buckets;
|
||||
class manual final
|
||||
{
|
||||
std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior>> queue;
|
||||
nano::node & node;
|
||||
nano::mutex mutex;
|
||||
nano::condition_variable condition;
|
||||
bool stopped{ false };
|
||||
std::thread thread;
|
||||
void notify ();
|
||||
bool predicate () const;
|
||||
void run ();
|
||||
|
||||
public:
|
||||
manual (nano::node & node);
|
||||
~manual ();
|
||||
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
// Manualy start an election for a block
|
||||
// Call action with confirmed block, may be different than what we started with
|
||||
void push (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal);
|
||||
|
||||
std::unique_ptr<container_info_component> collect_container_info (std::string const & name);
|
||||
}; // class manual
|
||||
} // nano::scheduler
|
|
@ -29,7 +29,7 @@ void nano::scheduler::optimistic::start ()
|
|||
debug_assert (!thread.joinable ());
|
||||
|
||||
thread = std::thread{ [this] () {
|
||||
nano::thread_role::set (nano::thread_role::name::optimistic_scheduler);
|
||||
nano::thread_role::set (nano::thread_role::name::scheduler_optimistic);
|
||||
run ();
|
||||
} };
|
||||
}
|
||||
|
|
|
@ -46,15 +46,15 @@ public:
|
|||
};
|
||||
class optimistic final
|
||||
{
|
||||
friend class component;
|
||||
void start ();
|
||||
void stop ();
|
||||
struct entry;
|
||||
|
||||
public:
|
||||
optimistic (optimistic_config const &, nano::node &, nano::ledger &, nano::active_transactions &, nano::network_constants const & network_constants, nano::stats &);
|
||||
~optimistic ();
|
||||
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
/**
|
||||
* Called from backlog population to process accounts with unconfirmed blocks
|
||||
*/
|
||||
|
|
|
@ -20,7 +20,7 @@ void nano::scheduler::priority::start ()
|
|||
debug_assert (!thread.joinable ());
|
||||
|
||||
thread = std::thread{ [this] () {
|
||||
nano::thread_role::set (nano::thread_role::name::election_scheduler);
|
||||
nano::thread_role::set (nano::thread_role::name::scheduler_priority);
|
||||
run ();
|
||||
} };
|
||||
}
|
||||
|
@ -35,13 +35,6 @@ void nano::scheduler::priority::stop ()
|
|||
nano::join_or_pass (thread);
|
||||
}
|
||||
|
||||
void nano::scheduler::priority::manual (std::shared_ptr<nano::block> const & block_a, boost::optional<nano::uint128_t> const & previous_balance_a, nano::election_behavior election_behavior_a)
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
manual_queue.push_back (std::make_tuple (block_a, previous_balance_a, election_behavior_a));
|
||||
notify ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::priority::activate (nano::account const & account_a, nano::transaction const & transaction)
|
||||
{
|
||||
debug_assert (!account_a.is_zero ());
|
||||
|
@ -79,12 +72,12 @@ void nano::scheduler::priority::notify ()
|
|||
std::size_t nano::scheduler::priority::size () const
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
return buckets->size () + manual_queue.size ();
|
||||
return buckets->size ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::priority::empty_locked () const
|
||||
{
|
||||
return buckets->empty () && manual_queue.empty ();
|
||||
return buckets->empty ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::priority::empty () const
|
||||
|
@ -93,47 +86,25 @@ bool nano::scheduler::priority::empty () const
|
|||
return empty_locked ();
|
||||
}
|
||||
|
||||
std::size_t nano::scheduler::priority::priority_queue_size () const
|
||||
{
|
||||
return buckets->size ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::priority::priority_queue_predicate () const
|
||||
bool nano::scheduler::priority::predicate () const
|
||||
{
|
||||
return node.active.vacancy () > 0 && !buckets->empty ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::priority::manual_queue_predicate () const
|
||||
{
|
||||
return !manual_queue.empty ();
|
||||
}
|
||||
|
||||
void nano::scheduler::priority::run ()
|
||||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
while (!stopped)
|
||||
{
|
||||
condition.wait (lock, [this] () {
|
||||
return stopped || priority_queue_predicate () || manual_queue_predicate ();
|
||||
return stopped || predicate ();
|
||||
});
|
||||
debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds
|
||||
if (!stopped)
|
||||
{
|
||||
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop);
|
||||
|
||||
if (manual_queue_predicate ())
|
||||
{
|
||||
auto const [block, previous_balance, election_behavior] = manual_queue.front ();
|
||||
manual_queue.pop_front ();
|
||||
lock.unlock ();
|
||||
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual);
|
||||
auto result = node.active.insert (block, election_behavior);
|
||||
if (result.election != nullptr)
|
||||
{
|
||||
result.election->transition_active ();
|
||||
}
|
||||
}
|
||||
else if (priority_queue_predicate ())
|
||||
if (predicate ())
|
||||
{
|
||||
auto block = buckets->top ();
|
||||
buckets->pop ();
|
||||
|
@ -164,7 +135,6 @@ std::unique_ptr<nano::container_info_component> nano::scheduler::priority::colle
|
|||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "manual_queue", manual_queue.size (), sizeof (decltype (manual_queue)::value_type) }));
|
||||
composite->add_component (buckets->collect_container_info ("buckets"));
|
||||
return composite;
|
||||
}
|
||||
|
|
|
@ -8,11 +8,13 @@
|
|||
#include <condition_variable>
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class block;
|
||||
class container_info_component;
|
||||
class node;
|
||||
}
|
||||
|
||||
|
@ -21,16 +23,15 @@ namespace nano::scheduler
|
|||
class buckets;
|
||||
class priority final
|
||||
{
|
||||
friend class component;
|
||||
void start ();
|
||||
void stop ();
|
||||
std::unique_ptr<container_info_component> collect_container_info (std::string const & name);
|
||||
|
||||
public:
|
||||
priority (nano::node &, nano::stats &);
|
||||
~priority ();
|
||||
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
// Manualy start an election for a block
|
||||
// Call action with confirmed block, may be different than what we started with
|
||||
void manual (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal);
|
||||
/**
|
||||
* Activates the first unconfirmed block of \p account_a
|
||||
* @return true if account was activated
|
||||
|
@ -39,8 +40,6 @@ public:
|
|||
void notify ();
|
||||
std::size_t size () const;
|
||||
bool empty () const;
|
||||
std::size_t priority_queue_size () const;
|
||||
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
|
||||
|
||||
private: // Dependencies
|
||||
nano::node & node;
|
||||
|
@ -49,12 +48,10 @@ private: // Dependencies
|
|||
private:
|
||||
void run ();
|
||||
bool empty_locked () const;
|
||||
bool priority_queue_predicate () const;
|
||||
bool manual_queue_predicate () const;
|
||||
bool predicate () const;
|
||||
|
||||
std::unique_ptr<nano::scheduler::buckets> buckets;
|
||||
|
||||
std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior>> manual_queue;
|
||||
bool stopped{ false };
|
||||
nano::condition_variable condition;
|
||||
mutable nano::mutex mutex;
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
#include <nano/node/json_handler.hpp>
|
||||
#include <nano/node/node_rpc_config.hpp>
|
||||
#include <nano/node/scheduler/component.hpp>
|
||||
#include <nano/node/scheduler/manual.hpp>
|
||||
#include <nano/node/scheduler/priority.hpp>
|
||||
#include <nano/rpc/rpc.hpp>
|
||||
#include <nano/rpc/rpc_request_processor.hpp>
|
||||
|
@ -1556,7 +1557,7 @@ TEST (rpc, process_subtype_open)
|
|||
ASSERT_EQ (nano::process_result::progress, node1->process (*send).code);
|
||||
ASSERT_EQ (nano::process_result::progress, node2.process (*send).code);
|
||||
auto const rpc_ctx = add_rpc (system, node1);
|
||||
node1->scheduler.priority.manual (send);
|
||||
node1->scheduler.manual.push (send);
|
||||
auto open = builder
|
||||
.state ()
|
||||
.account (key.pub)
|
||||
|
@ -1605,7 +1606,7 @@ TEST (rpc, process_subtype_receive)
|
|||
ASSERT_EQ (nano::process_result::progress, node1->process (*send).code);
|
||||
ASSERT_EQ (nano::process_result::progress, node2.process (*send).code);
|
||||
auto const rpc_ctx = add_rpc (system, node1);
|
||||
node1->scheduler.priority.manual (send);
|
||||
node1->scheduler.manual.push (send);
|
||||
auto receive = builder
|
||||
.state ()
|
||||
.account (nano::dev::genesis_key.pub)
|
||||
|
@ -6968,5 +6969,5 @@ TEST (node, election_scheduler_container_info)
|
|||
request.put ("action", "stats");
|
||||
request.put ("type", "objects");
|
||||
auto response = wait_response (system, rpc_ctx, request);
|
||||
auto es = response.get_child ("node").get_child ("priority_scheduler");
|
||||
auto es = response.get_child ("node").get_child ("scheduler");
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
#include <nano/lib/threading.hpp>
|
||||
#include <nano/node/election.hpp>
|
||||
#include <nano/node/scheduler/component.hpp>
|
||||
#include <nano/node/scheduler/manual.hpp>
|
||||
#include <nano/node/scheduler/priority.hpp>
|
||||
#include <nano/node/transport/inproc.hpp>
|
||||
#include <nano/node/unchecked_map.hpp>
|
||||
|
@ -677,7 +678,7 @@ TEST (confirmation_height, many_accounts_single_confirmation)
|
|||
{
|
||||
auto block = node->block (last_open_hash);
|
||||
ASSERT_NE (nullptr, block);
|
||||
node->scheduler.priority.manual (block);
|
||||
node->scheduler.manual.push (block);
|
||||
std::shared_ptr<nano::election> election;
|
||||
ASSERT_TIMELY (10s, (election = node->active.election (block->qualified_root ())) != nullptr);
|
||||
election->force_confirm ();
|
||||
|
@ -760,7 +761,7 @@ TEST (confirmation_height, many_accounts_many_confirmations)
|
|||
// Confirm all of the accounts
|
||||
for (auto & open_block : open_blocks)
|
||||
{
|
||||
node->scheduler.priority.manual (open_block);
|
||||
node->scheduler.manual.push (open_block);
|
||||
std::shared_ptr<nano::election> election;
|
||||
ASSERT_TIMELY (10s, (election = node->active.election (open_block->qualified_root ())) != nullptr);
|
||||
election->force_confirm ();
|
||||
|
@ -900,7 +901,7 @@ TEST (confirmation_height, long_chains)
|
|||
|
||||
// Call block confirm on the existing receive block on the genesis account which will confirm everything underneath on both accounts
|
||||
{
|
||||
node->scheduler.priority.manual (receive1);
|
||||
node->scheduler.manual.push (receive1);
|
||||
std::shared_ptr<nano::election> election;
|
||||
ASSERT_TIMELY (10s, (election = node->active.election (receive1->qualified_root ())) != nullptr);
|
||||
election->force_confirm ();
|
||||
|
@ -2225,7 +2226,7 @@ TEST (node, wallet_create_block_confirm_conflicts)
|
|||
// Call block confirm on the top level send block which will confirm everything underneath on both accounts.
|
||||
{
|
||||
auto block = node->store.block.get (node->store.tx_begin_read (), latest);
|
||||
node->scheduler.priority.manual (block);
|
||||
node->scheduler.manual.push (block);
|
||||
std::shared_ptr<nano::election> election;
|
||||
ASSERT_TIMELY (10s, (election = node->active.election (block->qualified_root ())) != nullptr);
|
||||
election->force_confirm ();
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#include <nano/crypto_lib/random_pool.hpp>
|
||||
#include <nano/node/scheduler/component.hpp>
|
||||
#include <nano/node/scheduler/manual.hpp>
|
||||
#include <nano/node/scheduler/priority.hpp>
|
||||
#include <nano/node/transport/fake.hpp>
|
||||
#include <nano/test_common/system.hpp>
|
||||
|
@ -124,7 +125,7 @@ bool nano::test::activate (nano::node & node, std::vector<nano::block_hash> hash
|
|||
// Block does not exist in the ledger yet
|
||||
return false;
|
||||
}
|
||||
node.scheduler.priority.manual (disk_block);
|
||||
node.scheduler.manual.push (disk_block);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -205,7 +206,7 @@ std::shared_ptr<nano::election> nano::test::start_election (nano::test::system &
|
|||
block_l = node_a.block (hash_a);
|
||||
}
|
||||
|
||||
node_a.scheduler.priority.manual (block_l);
|
||||
node_a.scheduler.manual.push (block_l);
|
||||
|
||||
// wait for the election to appear
|
||||
std::shared_ptr<nano::election> election = node_a.active.election (block_l->qualified_root ());
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue