diff --git a/nano/core_test/distributed_work.cpp b/nano/core_test/distributed_work.cpp index ff4456b4..26c5e8c7 100644 --- a/nano/core_test/distributed_work.cpp +++ b/nano/core_test/distributed_work.cpp @@ -5,6 +5,13 @@ using namespace std::chrono_literals; +TEST (distributed_work, stopped) +{ + nano::system system (24000, 1); + system.nodes[0]->distributed_work.stop (); + ASSERT_TRUE (system.nodes[0]->distributed_work.make (nano::block_hash (), {}, {}, nano::network_constants::publish_test_threshold)); +} + TEST (distributed_work, no_peers) { nano::system system (24000, 1); @@ -17,7 +24,7 @@ TEST (distributed_work, no_peers) work = work_a; done = true; }; - node->distributed_work.make (hash, node->config.work_peers, callback, node->network_params.network.publish_threshold, nano::account ()); + ASSERT_FALSE (node->distributed_work.make (hash, node->config.work_peers, callback, node->network_params.network.publish_threshold, nano::account ())); system.deadline_set (5s); while (!done) { @@ -39,16 +46,7 @@ TEST (distributed_work, no_peers_disabled) nano::node_config node_config (24000, system.logging); node_config.work_threads = 0; auto & node = *system.add_node (node_config); - bool done{ false }; - auto callback_failure = [&done](boost::optional work_a) { - ASSERT_FALSE (work_a.is_initialized ()); - done = true; - }; - node.distributed_work.make (nano::block_hash (), node.config.work_peers, callback_failure, nano::network_constants::publish_test_threshold); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TRUE (node.distributed_work.make (nano::block_hash (), node.config.work_peers, {}, nano::network_constants::publish_test_threshold)); } TEST (distributed_work, no_peers_cancel) @@ -64,7 +62,7 @@ TEST (distributed_work, no_peers_cancel) ASSERT_FALSE (work_a.is_initialized ()); done = true; }; - node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold)); + ASSERT_FALSE (node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold))); ASSERT_EQ (1, node.distributed_work.items.size ()); // cleanup should not cancel or remove an ongoing work node.distributed_work.cleanup_finished (); @@ -80,7 +78,7 @@ TEST (distributed_work, no_peers_cancel) // now using observer done = false; - node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold)); + ASSERT_FALSE (node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold))); ASSERT_EQ (1, node.distributed_work.items.size ()); node.observers.work_cancel.notify (hash); system.deadline_set (20s); @@ -104,7 +102,7 @@ TEST (distributed_work, no_peers_multi) // Test many works for the same root for (unsigned i{ 0 }; i < total; ++i) { - node->distributed_work.make (hash, node->config.work_peers, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold)); + ASSERT_FALSE (node->distributed_work.make (hash, node->config.work_peers, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold))); } // 1 root, and _total_ requests for that root are expected, but some may have already finished ASSERT_EQ (1, node->distributed_work.items.size ()); @@ -129,7 +127,7 @@ TEST (distributed_work, no_peers_multi) for (unsigned i{ 0 }; i < total; ++i) { nano::block_hash hash_i (i + 1); - node->distributed_work.make (hash_i, node->config.work_peers, callback, node->network_params.network.publish_threshold); + ASSERT_FALSE (node->distributed_work.make (hash_i, node->config.work_peers, callback, node->network_params.network.publish_threshold)); } // 10 roots expected with 1 work each, but some may have completed so test for some ASSERT_GT (node->distributed_work.items.size (), 5); diff --git a/nano/lib/work.cpp b/nano/lib/work.cpp index 7e4a3c8a..d78e2aec 100644 --- a/nano/lib/work.cpp +++ b/nano/lib/work.cpp @@ -167,7 +167,7 @@ void nano::work_pool::cancel (nano::root const & root_a) } } pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) { - bool result; + bool result{ false }; if (item_a.item == root_a) { if (item_a.callback) @@ -176,10 +176,6 @@ void nano::work_pool::cancel (nano::root const & root_a) } result = true; } - else - { - result = false; - } return result; }); } diff --git a/nano/node/distributed_work.cpp b/nano/node/distributed_work.cpp index cf159812..1425581c 100644 --- a/nano/node/distributed_work.cpp +++ b/nano/node/distributed_work.cpp @@ -30,7 +30,7 @@ elapsed (nano::timer_state::started, "distributed work generation timer") nano::distributed_work::~distributed_work () { - if (node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::work)) + if (!node.stopped && node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::work)) { nano::websocket::message_builder builder; if (completed) @@ -348,11 +348,12 @@ void nano::distributed_work::handle_failure (bool const last_a) auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); // clang-format off node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l = root, peers_l = peers, callback_l = callback, next_backoff, difficulty = difficulty, account_l = account ] { + bool error_l {true}; if (auto node_l = node_w.lock ()) { - node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l); + error_l = node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l); } - else if (callback_l) + if (error_l && callback_l) { callback_l (boost::none); } @@ -384,46 +385,51 @@ node (node_a) { } -void nano::distributed_work_factory::make (nano::root const & root_a, std::vector> const & peers_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) +nano::distributed_work_factory::~distributed_work_factory () { - make (1, root_a, peers_a, callback_a, difficulty_a, account_a); + stop (); } -void nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector> const & peers_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) +bool nano::distributed_work_factory::make (nano::root const & root_a, std::vector> const & peers_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) { - cleanup_finished (); - if (node.work_generation_enabled ()) + return make (1, root_a, peers_a, callback_a, difficulty_a, account_a); +} + +bool nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector> const & peers_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) +{ + bool error_l{ true }; + if (!stopped) { - auto distributed (std::make_shared (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a)); + cleanup_finished (); + if (node.work_generation_enabled ()) { - nano::lock_guard guard (mutex); - items[root_a].emplace_back (distributed); + auto distributed (std::make_shared (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a)); + { + nano::lock_guard guard (mutex); + items[root_a].emplace_back (distributed); + } + distributed->start (); + error_l = false; } - distributed->start (); - } - else if (callback_a) - { - callback_a (boost::none); } + return error_l; } void nano::distributed_work_factory::cancel (nano::root const & root_a, bool const local_stop) { + nano::lock_guard guard_l (mutex); + auto existing_l (items.find (root_a)); + if (existing_l != items.end ()) { - nano::lock_guard guard (mutex); - auto existing_l (items.find (root_a)); - if (existing_l != items.end ()) + for (auto & distributed_w : existing_l->second) { - for (auto & distributed_w : existing_l->second) + if (auto distributed_l = distributed_w.lock ()) { - if (auto distributed_l = distributed_w.lock ()) - { - // Send work_cancel to work peers and stop local work generation - distributed_l->cancel_once (); - } + // Send work_cancel to work peers and stop local work generation + distributed_l->cancel_once (); } - items.erase (existing_l); } + items.erase (existing_l); } } @@ -448,6 +454,27 @@ void nano::distributed_work_factory::cleanup_finished () } } +void nano::distributed_work_factory::stop () +{ + if (!stopped.exchange (true)) + { + // Cancel any ongoing work + std::unordered_set roots_l; + nano::unique_lock lock_l (mutex); + for (auto const & item_l : items) + { + roots_l.insert (item_l.first); + } + lock_l.unlock (); + for (auto const & root_l : roots_l) + { + cancel (root_l, true); + } + lock_l.lock (); + items.clear (); + } +} + namespace nano { std::unique_ptr collect_seq_con_info (distributed_work_factory & distributed_work, const std::string & name) diff --git a/nano/node/distributed_work.hpp b/nano/node/distributed_work.hpp index 1dd476f3..d7feb993 100644 --- a/nano/node/distributed_work.hpp +++ b/nano/node/distributed_work.hpp @@ -77,14 +77,17 @@ class distributed_work_factory final { public: distributed_work_factory (nano::node &); - void make (nano::root const &, std::vector> const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); - void make (unsigned int, nano::root const &, std::vector> const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); + ~distributed_work_factory (); + bool make (nano::root const &, std::vector> const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); + bool make (unsigned int, nano::root const &, std::vector> const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); void cancel (nano::root const &, bool const local_stop = false); void cleanup_finished (); + void stop (); nano::node & node; std::unordered_map>> items; std::mutex mutex; + std::atomic stopped{ false }; }; class seq_con_info_component; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index d0b2969f..b8af783d 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -712,6 +712,7 @@ void nano::node::stop () wallets.stop (); stats.stop (); worker.stop (); + distributed_work.stop (); // work pool is not stopped on purpose due to testing setup } } @@ -999,7 +1000,11 @@ void nano::node::work_generate (nano::root const & root_a, std::function)> callback_a, uint64_t difficulty_a, boost::optional const & account_a, bool secondary_work_peers_a) { auto const & peers_l (secondary_work_peers_a ? config.secondary_work_peers : config.work_peers); - distributed_work.make (root_a, peers_l, callback_a, difficulty_a, account_a); + if (distributed_work.make (root_a, peers_l, callback_a, difficulty_a, account_a)) + { + // Error in creating the job (either stopped or work generation is not possible) + callback_a (boost::none); + } } boost::optional nano::node::work_generate_blocking (nano::root const & root_a, boost::optional const & account_a) @@ -1009,15 +1014,14 @@ boost::optional nano::node::work_generate_blocking (nano::root const & boost::optional nano::node::work_generate_blocking (nano::root const & root_a, uint64_t difficulty_a, boost::optional const & account_a) { - std::promise promise; - std::future future = promise.get_future (); + std::promise> promise; // clang-format off - work_generate (root_a, [&promise](boost::optional work_a) { - promise.set_value (work_a.value_or (0)); + work_generate (root_a, [&promise](boost::optional opt_work_a) { + promise.set_value (opt_work_a); }, difficulty_a, account_a); // clang-format on - return future.get (); + return promise.get_future ().get (); } void nano::node::add_initial_peers () diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index 6f321d66..781c3bc9 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -1376,7 +1376,7 @@ void nano::wallet::work_cache_blocking (nano::account const & account_a, nano::r work_update (transaction_l, account_a, root_a, *opt_work_l); } } - else + else if (!wallets.node.stopped) { wallets.node.logger.try_log (boost::str (boost::format ("Could not precache work for root %1 due to work generation failure") % root_a.to_string ())); }