diff --git a/nano/core_test/active_elections.cpp b/nano/core_test/active_elections.cpp index ac6e95ba3..cc3d4d06a 100644 --- a/nano/core_test/active_elections.cpp +++ b/nano/core_test/active_elections.cpp @@ -658,7 +658,6 @@ TEST (active_elections, vote_replays) .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (); - ASSERT_NE (nullptr, send1); // create open block for key receing Knano_ratio raw auto open1 = builder.make_block () @@ -670,11 +669,9 @@ TEST (active_elections, vote_replays) .sign (key.prv, key.pub) .work (*system.work.generate (key.pub)) .build (); - ASSERT_NE (nullptr, open1); // wait for elections objects to appear in the AEC - node.process_active (send1); - node.process_active (open1); + nano::test::process (node, { send1, open1 }); ASSERT_TRUE (nano::test::start_elections (system, node, { send1, open1 })); ASSERT_EQ (2, node.active.size ()); @@ -709,8 +706,7 @@ TEST (active_elections, vote_replays) .sign (key.prv, key.pub) .work (*system.work.generate (open1->hash ())) .build (); - ASSERT_NE (nullptr, send2); - node.process_active (send2); + nano::test::process (node, { send2 }); ASSERT_TRUE (nano::test::start_elections (system, node, { send2 })); ASSERT_EQ (1, node.active.size ()); diff --git a/nano/core_test/election.cpp b/nano/core_test/election.cpp index 8cabb88cd..7fe943ad0 100644 --- a/nano/core_test/election.cpp +++ b/nano/core_test/election.cpp @@ -152,10 +152,12 @@ TEST (election, quorum_minimum_confirm_success) .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) .build (); node1.work_generate_blocking (*send1); - node1.process_active (send1); + + nano::test::process (node1, { send1 }); auto election = nano::test::start_election (system, node1, send1->hash ()); ASSERT_NE (nullptr, election); ASSERT_EQ (1, election->blocks ().size ()); + auto vote = nano::test::make_final_vote (nano::dev::genesis_key, { send1->hash () }); ASSERT_EQ (nano::vote_code::vote, node1.vote_router.vote (vote).at (send1->hash ())); ASSERT_NE (nullptr, node1.block (send1->hash ())); @@ -182,7 +184,7 @@ TEST (election, quorum_minimum_confirm_fail) .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) .build (); - node1.process_active (send1); + nano::test::process (node1, { send1 }); auto election = nano::test::start_election (system, node1, send1->hash ()); ASSERT_NE (nullptr, election); ASSERT_EQ (1, election->blocks ().size ()); diff --git a/nano/node/scheduler/manual.cpp b/nano/node/scheduler/manual.cpp index f2fa3d1e2..b2ac593b2 100644 --- a/nano/node/scheduler/manual.cpp +++ b/nano/node/scheduler/manual.cpp @@ -30,7 +30,7 @@ void nano::scheduler::manual::stop () nano::lock_guard lock{ mutex }; stopped = true; } - notify (); + condition.notify_all (); nano::join_or_pass (thread); } @@ -39,23 +39,43 @@ void nano::scheduler::manual::notify () condition.notify_all (); } -void nano::scheduler::manual::push (std::shared_ptr const & block_a, boost::optional const & previous_balance_a) +auto nano::scheduler::manual::push (std::shared_ptr const & block) -> std::future> { nano::lock_guard lock{ mutex }; - queue.push_back (std::make_tuple (block_a, previous_balance_a, nano::election_behavior::manual)); - notify (); + + // Check if block already exists + auto & hash_index = queue.get (); + + if (hash_index.contains (block->hash ())) + { + // Block already exists, return future that immediately resolves to nullptr + std::promise> promise; + auto future = promise.get_future (); + promise.set_value (nullptr); + return future; + } + + // Create entry and get future before inserting + entry new_entry{ block }; + auto future = new_entry.promise.get_future (); + + auto [it, inserted] = queue.push_back (std::move (new_entry)); + debug_assert (inserted); + + condition.notify_all (); + return future; } bool nano::scheduler::manual::contains (nano::block_hash const & hash) const { nano::lock_guard lock{ mutex }; - return std::any_of (queue.cbegin (), queue.cend (), [&hash] (auto const & item) { - return std::get<0> (item)->hash () == hash; - }); + auto & hash_index = queue.get (); + return hash_index.contains (hash); } bool nano::scheduler::manual::predicate () const { + debug_assert (!mutex.try_lock ()); return !queue.empty (); } @@ -67,28 +87,37 @@ void nano::scheduler::manual::run () condition.wait (lock, [this] () { return stopped || predicate (); }); - debug_assert ((std::this_thread::yield (), true)); // Introduce some random delay in debug builds - if (!stopped) + + if (stopped) + { + return; + } + + debug_assert ((std::this_thread::yield (), true)); + + if (predicate ()) { node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::loop); - if (predicate ()) + auto promise = std::move (queue.front ().promise); + auto block = queue.front ().block; + queue.pop_front (); + + lock.unlock (); + + auto result = node.active.insert (block, nano::election_behavior::manual); + if (result.inserted) { - auto const [block, previous_balance, election_behavior] = queue.front (); - queue.pop_front (); - lock.unlock (); node.stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_manual); - auto result = node.active.insert (block, election_behavior); - if (result.election != nullptr) - { - result.election->transition_active (); - } } - else + if (result.election != nullptr) { - lock.unlock (); + result.election->transition_active (); } - notify (); + + // Fulfill the promise + promise.set_value (result.election); + lock.lock (); } } diff --git a/nano/node/scheduler/manual.hpp b/nano/node/scheduler/manual.hpp index c62f215bd..9290edd06 100644 --- a/nano/node/scheduler/manual.hpp +++ b/nano/node/scheduler/manual.hpp @@ -4,41 +4,73 @@ #include #include -#include +#include +#include +#include +#include +#include -#include +#include +#include #include -#include +#include + +namespace mi = boost::multi_index; namespace nano::scheduler { -class buckets; - class manual final { - std::deque, boost::optional, nano::election_behavior>> queue; - nano::node & node; - mutable nano::mutex mutex; - nano::condition_variable condition; - bool stopped{ false }; - std::thread thread; - void notify (); - bool predicate () const; - void run (); - public: - explicit manual (nano::node & node); + explicit manual (nano::node &); ~manual (); void start (); void stop (); - // Manually start an election for a block - // Call action with confirmed block, may be different than what we started with - void push (std::shared_ptr const &, boost::optional const & = boost::none); + std::future> push (std::shared_ptr const & block); bool contains (nano::block_hash const &) const; nano::container_info container_info () const; + +private: + bool predicate () const; + void notify (); + void run (); + +private: // Dependencies + nano::node & node; + +private: + struct entry + { + std::shared_ptr block; + mutable std::promise> promise; + + nano::block_hash hash () const + { + return block->hash (); + } + }; + + // clang-format off + class tag_sequenced {}; + class tag_hash {}; + + using ordered_queue = boost::multi_index_container>, + mi::hashed_unique, + mi::const_mem_fun> + >>; + // clang-format on + + ordered_queue queue; + + bool stopped{ false }; + nano::condition_variable condition; + mutable nano::mutex mutex; + std::thread thread; }; } diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index b53f10448..603135eab 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -6820,8 +6820,7 @@ TEST (rpc, confirmation_active) .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) .work (*system.work.generate (send1->hash ())) .build (); - node1->process_active (send1); - node1->process_active (send2); + nano::test::process (*node1, { send1, send2 }); ASSERT_TRUE (nano::test::start_elections (system, *node1, { send1, send2 })); ASSERT_EQ (2, node1->active.size ()); auto election (node1->active.election (send1->qualified_root ())); diff --git a/nano/test_common/testutil.cpp b/nano/test_common/testutil.cpp index 9dbe4210d..6e5a33e1c 100644 --- a/nano/test_common/testutil.cpp +++ b/nano/test_common/testutil.cpp @@ -266,48 +266,34 @@ std::shared_ptr nano::test::test_channel (nano::n return channel; } -std::shared_ptr nano::test::start_election (nano::test::system & system_a, nano::node & node_a, const nano::block_hash & hash_a) +std::shared_ptr nano::test::start_election (nano::test::system & system, nano::node & node, const nano::block_hash & hash) { - system_a.deadline_set (5s); + system.deadline_set (5s); - // wait until and ensure that the block is in the ledger - auto block_l = node_a.block (hash_a); - while (!block_l) - { - if (system_a.poll ()) - { - return nullptr; - } - block_l = node_a.block (hash_a); - } + // Wait until and ensure that the block is in the ledger + auto block_l = node.block (hash); + debug_assert (block_l); - node_a.scheduler.manual.push (block_l); + auto fut = node.scheduler.manual.push (block_l); - // wait for the election to appear - std::shared_ptr election = node_a.active.election (block_l->qualified_root ()); - while (!election) - { - if (system_a.poll ()) - { - return nullptr; - } - election = node_a.active.election (block_l->qualified_root ()); - } + // Wait for the block to be scheduled + auto status = fut.wait_for (5s); + debug_assert (status == std::future_status::ready); - election->transition_active (); + auto election = fut.get (); return election; } -bool nano::test::start_elections (nano::test::system & system_a, nano::node & node_a, std::vector const & hashes_a, bool const forced_a) +bool nano::test::start_elections (nano::test::system & system, nano::node & node, std::vector const & hashes, bool const forced) { - for (auto const & hash_l : hashes_a) + for (auto const & hash_l : hashes) { - auto election = nano::test::start_election (system_a, node_a, hash_l); + auto election = start_election (system, node, hash_l); if (!election) { return false; } - if (forced_a) + if (forced) { election->force_confirm (); } @@ -315,9 +301,9 @@ bool nano::test::start_elections (nano::test::system & system_a, nano::node & no return true; } -bool nano::test::start_elections (nano::test::system & system_a, nano::node & node_a, std::vector> const & blocks_a, bool const forced_a) +bool nano::test::start_elections (nano::test::system & system, nano::node & node, std::vector> const & blocks, bool const forced) { - return nano::test::start_elections (system_a, node_a, blocks_to_hashes (blocks_a), forced_a); + return start_elections (system, node, blocks_to_hashes (blocks), forced); } nano::account_info nano::test::account_info (nano::node const & node, nano::account const & acc)