Improve distributed_work stopping with ongoing worker tasks (#2369)
* Improve distributed_work stopping with ongoing worker tasks * Another const * Fix work_generate_blocking replacing cancelled work with zero-filled work * Make sure items cannot be canceled twice * Simplifying * Protect stopped * Fix ocasionally stuck tests * Return true/false for errors in distributed_work::make(), leaving the decision to do the callback for the caller * Add a comment to clarify
This commit is contained in:
parent
a7417b35b8
commit
2ad00efc2c
6 changed files with 83 additions and 55 deletions
|
@ -5,6 +5,13 @@
|
|||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
TEST (distributed_work, stopped)
|
||||
{
|
||||
nano::system system (24000, 1);
|
||||
system.nodes[0]->distributed_work.stop ();
|
||||
ASSERT_TRUE (system.nodes[0]->distributed_work.make (nano::block_hash (), {}, {}, nano::network_constants::publish_test_threshold));
|
||||
}
|
||||
|
||||
TEST (distributed_work, no_peers)
|
||||
{
|
||||
nano::system system (24000, 1);
|
||||
|
@ -17,7 +24,7 @@ TEST (distributed_work, no_peers)
|
|||
work = work_a;
|
||||
done = true;
|
||||
};
|
||||
node->distributed_work.make (hash, node->config.work_peers, callback, node->network_params.network.publish_threshold, nano::account ());
|
||||
ASSERT_FALSE (node->distributed_work.make (hash, node->config.work_peers, callback, node->network_params.network.publish_threshold, nano::account ()));
|
||||
system.deadline_set (5s);
|
||||
while (!done)
|
||||
{
|
||||
|
@ -39,16 +46,7 @@ TEST (distributed_work, no_peers_disabled)
|
|||
nano::node_config node_config (24000, system.logging);
|
||||
node_config.work_threads = 0;
|
||||
auto & node = *system.add_node (node_config);
|
||||
bool done{ false };
|
||||
auto callback_failure = [&done](boost::optional<uint64_t> work_a) {
|
||||
ASSERT_FALSE (work_a.is_initialized ());
|
||||
done = true;
|
||||
};
|
||||
node.distributed_work.make (nano::block_hash (), node.config.work_peers, callback_failure, nano::network_constants::publish_test_threshold);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_TRUE (node.distributed_work.make (nano::block_hash (), node.config.work_peers, {}, nano::network_constants::publish_test_threshold));
|
||||
}
|
||||
|
||||
TEST (distributed_work, no_peers_cancel)
|
||||
|
@ -64,7 +62,7 @@ TEST (distributed_work, no_peers_cancel)
|
|||
ASSERT_FALSE (work_a.is_initialized ());
|
||||
done = true;
|
||||
};
|
||||
node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold));
|
||||
ASSERT_FALSE (node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold)));
|
||||
ASSERT_EQ (1, node.distributed_work.items.size ());
|
||||
// cleanup should not cancel or remove an ongoing work
|
||||
node.distributed_work.cleanup_finished ();
|
||||
|
@ -80,7 +78,7 @@ TEST (distributed_work, no_peers_cancel)
|
|||
|
||||
// now using observer
|
||||
done = false;
|
||||
node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold));
|
||||
ASSERT_FALSE (node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold)));
|
||||
ASSERT_EQ (1, node.distributed_work.items.size ());
|
||||
node.observers.work_cancel.notify (hash);
|
||||
system.deadline_set (20s);
|
||||
|
@ -104,7 +102,7 @@ TEST (distributed_work, no_peers_multi)
|
|||
// Test many works for the same root
|
||||
for (unsigned i{ 0 }; i < total; ++i)
|
||||
{
|
||||
node->distributed_work.make (hash, node->config.work_peers, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold));
|
||||
ASSERT_FALSE (node->distributed_work.make (hash, node->config.work_peers, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold)));
|
||||
}
|
||||
// 1 root, and _total_ requests for that root are expected, but some may have already finished
|
||||
ASSERT_EQ (1, node->distributed_work.items.size ());
|
||||
|
@ -129,7 +127,7 @@ TEST (distributed_work, no_peers_multi)
|
|||
for (unsigned i{ 0 }; i < total; ++i)
|
||||
{
|
||||
nano::block_hash hash_i (i + 1);
|
||||
node->distributed_work.make (hash_i, node->config.work_peers, callback, node->network_params.network.publish_threshold);
|
||||
ASSERT_FALSE (node->distributed_work.make (hash_i, node->config.work_peers, callback, node->network_params.network.publish_threshold));
|
||||
}
|
||||
// 10 roots expected with 1 work each, but some may have completed so test for some
|
||||
ASSERT_GT (node->distributed_work.items.size (), 5);
|
||||
|
|
|
@ -167,7 +167,7 @@ void nano::work_pool::cancel (nano::root const & root_a)
|
|||
}
|
||||
}
|
||||
pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) {
|
||||
bool result;
|
||||
bool result{ false };
|
||||
if (item_a.item == root_a)
|
||||
{
|
||||
if (item_a.callback)
|
||||
|
@ -176,10 +176,6 @@ void nano::work_pool::cancel (nano::root const & root_a)
|
|||
}
|
||||
result = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
result = false;
|
||||
}
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ elapsed (nano::timer_state::started, "distributed work generation timer")
|
|||
|
||||
nano::distributed_work::~distributed_work ()
|
||||
{
|
||||
if (node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::work))
|
||||
if (!node.stopped && node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::work))
|
||||
{
|
||||
nano::websocket::message_builder builder;
|
||||
if (completed)
|
||||
|
@ -348,11 +348,12 @@ void nano::distributed_work::handle_failure (bool const last_a)
|
|||
auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5));
|
||||
// clang-format off
|
||||
node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l = root, peers_l = peers, callback_l = callback, next_backoff, difficulty = difficulty, account_l = account ] {
|
||||
bool error_l {true};
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l);
|
||||
error_l = node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l);
|
||||
}
|
||||
else if (callback_l)
|
||||
if (error_l && callback_l)
|
||||
{
|
||||
callback_l (boost::none);
|
||||
}
|
||||
|
@ -384,46 +385,51 @@ node (node_a)
|
|||
{
|
||||
}
|
||||
|
||||
void nano::distributed_work_factory::make (nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
|
||||
nano::distributed_work_factory::~distributed_work_factory ()
|
||||
{
|
||||
make (1, root_a, peers_a, callback_a, difficulty_a, account_a);
|
||||
stop ();
|
||||
}
|
||||
|
||||
void nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
|
||||
bool nano::distributed_work_factory::make (nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
|
||||
{
|
||||
cleanup_finished ();
|
||||
if (node.work_generation_enabled ())
|
||||
return make (1, root_a, peers_a, callback_a, difficulty_a, account_a);
|
||||
}
|
||||
|
||||
bool nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
|
||||
{
|
||||
bool error_l{ true };
|
||||
if (!stopped)
|
||||
{
|
||||
auto distributed (std::make_shared<nano::distributed_work> (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a));
|
||||
cleanup_finished ();
|
||||
if (node.work_generation_enabled ())
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
items[root_a].emplace_back (distributed);
|
||||
auto distributed (std::make_shared<nano::distributed_work> (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a));
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
items[root_a].emplace_back (distributed);
|
||||
}
|
||||
distributed->start ();
|
||||
error_l = false;
|
||||
}
|
||||
distributed->start ();
|
||||
}
|
||||
else if (callback_a)
|
||||
{
|
||||
callback_a (boost::none);
|
||||
}
|
||||
return error_l;
|
||||
}
|
||||
|
||||
void nano::distributed_work_factory::cancel (nano::root const & root_a, bool const local_stop)
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard_l (mutex);
|
||||
auto existing_l (items.find (root_a));
|
||||
if (existing_l != items.end ())
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
auto existing_l (items.find (root_a));
|
||||
if (existing_l != items.end ())
|
||||
for (auto & distributed_w : existing_l->second)
|
||||
{
|
||||
for (auto & distributed_w : existing_l->second)
|
||||
if (auto distributed_l = distributed_w.lock ())
|
||||
{
|
||||
if (auto distributed_l = distributed_w.lock ())
|
||||
{
|
||||
// Send work_cancel to work peers and stop local work generation
|
||||
distributed_l->cancel_once ();
|
||||
}
|
||||
// Send work_cancel to work peers and stop local work generation
|
||||
distributed_l->cancel_once ();
|
||||
}
|
||||
items.erase (existing_l);
|
||||
}
|
||||
items.erase (existing_l);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -448,6 +454,27 @@ void nano::distributed_work_factory::cleanup_finished ()
|
|||
}
|
||||
}
|
||||
|
||||
void nano::distributed_work_factory::stop ()
|
||||
{
|
||||
if (!stopped.exchange (true))
|
||||
{
|
||||
// Cancel any ongoing work
|
||||
std::unordered_set<nano::root> roots_l;
|
||||
nano::unique_lock<std::mutex> lock_l (mutex);
|
||||
for (auto const & item_l : items)
|
||||
{
|
||||
roots_l.insert (item_l.first);
|
||||
}
|
||||
lock_l.unlock ();
|
||||
for (auto const & root_l : roots_l)
|
||||
{
|
||||
cancel (root_l, true);
|
||||
}
|
||||
lock_l.lock ();
|
||||
items.clear ();
|
||||
}
|
||||
}
|
||||
|
||||
namespace nano
|
||||
{
|
||||
std::unique_ptr<seq_con_info_component> collect_seq_con_info (distributed_work_factory & distributed_work, const std::string & name)
|
||||
|
|
|
@ -77,14 +77,17 @@ class distributed_work_factory final
|
|||
{
|
||||
public:
|
||||
distributed_work_factory (nano::node &);
|
||||
void make (nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
|
||||
void make (unsigned int, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
|
||||
~distributed_work_factory ();
|
||||
bool make (nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
|
||||
bool make (unsigned int, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
|
||||
void cancel (nano::root const &, bool const local_stop = false);
|
||||
void cleanup_finished ();
|
||||
void stop ();
|
||||
|
||||
nano::node & node;
|
||||
std::unordered_map<nano::root, std::vector<std::weak_ptr<nano::distributed_work>>> items;
|
||||
std::mutex mutex;
|
||||
std::atomic<bool> stopped{ false };
|
||||
};
|
||||
|
||||
class seq_con_info_component;
|
||||
|
|
|
@ -712,6 +712,7 @@ void nano::node::stop ()
|
|||
wallets.stop ();
|
||||
stats.stop ();
|
||||
worker.stop ();
|
||||
distributed_work.stop ();
|
||||
// work pool is not stopped on purpose due to testing setup
|
||||
}
|
||||
}
|
||||
|
@ -999,7 +1000,11 @@ void nano::node::work_generate (nano::root const & root_a, std::function<void(bo
|
|||
void nano::node::work_generate (nano::root const & root_a, std::function<void(boost::optional<uint64_t>)> callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a, bool secondary_work_peers_a)
|
||||
{
|
||||
auto const & peers_l (secondary_work_peers_a ? config.secondary_work_peers : config.work_peers);
|
||||
distributed_work.make (root_a, peers_l, callback_a, difficulty_a, account_a);
|
||||
if (distributed_work.make (root_a, peers_l, callback_a, difficulty_a, account_a))
|
||||
{
|
||||
// Error in creating the job (either stopped or work generation is not possible)
|
||||
callback_a (boost::none);
|
||||
}
|
||||
}
|
||||
|
||||
boost::optional<uint64_t> nano::node::work_generate_blocking (nano::root const & root_a, boost::optional<nano::account> const & account_a)
|
||||
|
@ -1009,15 +1014,14 @@ boost::optional<uint64_t> nano::node::work_generate_blocking (nano::root const &
|
|||
|
||||
boost::optional<uint64_t> nano::node::work_generate_blocking (nano::root const & root_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
|
||||
{
|
||||
std::promise<uint64_t> promise;
|
||||
std::future<uint64_t> future = promise.get_future ();
|
||||
std::promise<boost::optional<uint64_t>> promise;
|
||||
// clang-format off
|
||||
work_generate (root_a, [&promise](boost::optional<uint64_t> work_a) {
|
||||
promise.set_value (work_a.value_or (0));
|
||||
work_generate (root_a, [&promise](boost::optional<uint64_t> opt_work_a) {
|
||||
promise.set_value (opt_work_a);
|
||||
},
|
||||
difficulty_a, account_a);
|
||||
// clang-format on
|
||||
return future.get ();
|
||||
return promise.get_future ().get ();
|
||||
}
|
||||
|
||||
void nano::node::add_initial_peers ()
|
||||
|
|
|
@ -1376,7 +1376,7 @@ void nano::wallet::work_cache_blocking (nano::account const & account_a, nano::r
|
|||
work_update (transaction_l, account_a, root_a, *opt_work_l);
|
||||
}
|
||||
}
|
||||
else
|
||||
else if (!wallets.node.stopped)
|
||||
{
|
||||
wallets.node.logger.try_log (boost::str (boost::format ("Could not precache work for root %1 due to work generation failure") % root_a.to_string ()));
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue