Non-blocking work watcher loop (#2264)

* Use gtest death style threadsafe

* Use an atomic bool to ensure thread safety

* Encapsulating work watcher size

* Work pool size() and checking before calling callback

* Use a non-blocking work generate to not block IO threads in work watcher

* Update wallet.work_watcher tests
This commit is contained in:
Guilherme Lawless 2019-08-30 14:31:34 +01:00 committed by GitHub
commit 95e1fc39e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 92 additions and 56 deletions

View file

@ -5,6 +5,9 @@
TEST (difficulty, multipliers)
{
// For ASSERT_DEATH_IF_SUPPORTED
testing::FLAGS_gtest_death_test_style = "threadsafe";
{
uint64_t base = 0xff00000000000000;
uint64_t difficulty = 0xfff27e7a57c285cd;

View file

@ -11,13 +11,15 @@ TEST (distributed_work, no_peers)
auto node (system.nodes[0]);
nano::block_hash hash;
boost::optional<uint64_t> work;
auto callback = [&work](boost::optional<uint64_t> work_a) {
std::atomic<bool> done{ false };
auto callback = [&work, &done](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
work = work_a;
done = true;
};
node->distributed_work.make (hash, callback, node->network_params.network.publish_threshold);
system.deadline_set (5s);
while (!work.is_initialized ())
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}

View file

@ -1125,10 +1125,12 @@ TEST (wallet, work_watcher_update)
uint64_t updated_difficulty1{ difficulty1 }, updated_difficulty2{ difficulty2 };
{
std::unique_lock<std::mutex> lock (node.active.mutex);
// Prevent active difficulty repopulating multipliers
node.network_params.network.request_interval_ms = 10000;
//fill multipliers_cb and update active difficulty;
for (auto i (0); i < node.active.multipliers_cb.size (); i++)
{
node.active.multipliers_cb.push_back (multiplier * (1 + i / 100.));
node.active.multipliers_cb.push_back (multiplier * (2 + i / 100.));
}
node.active.update_active_difficulty (lock);
}
@ -1136,16 +1138,19 @@ TEST (wallet, work_watcher_update)
while (updated_difficulty1 == difficulty1 || updated_difficulty2 == difficulty2)
{
{
auto const existing (node.active.roots.find (block1->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_difficulty1 = existing->difficulty;
}
{
auto const existing (node.active.roots.find (block2->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_difficulty2 = existing->difficulty;
std::lock_guard<std::mutex> guard (node.active.mutex);
{
auto const existing (node.active.roots.find (block1->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_difficulty1 = existing->difficulty;
}
{
auto const existing (node.active.roots.find (block2->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_difficulty2 = existing->difficulty;
}
}
ASSERT_NO_ERROR (system.poll ());
}
@ -1162,12 +1167,12 @@ TEST (wallet, work_watcher_removed)
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
ASSERT_EQ (0, wallet.wallets.watcher->watched.size ());
ASSERT_EQ (0, wallet.wallets.watcher->size ());
auto const block (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
ASSERT_EQ (1, wallet.wallets.watcher->watched.size ());
ASSERT_EQ (1, wallet.wallets.watcher->size ());
auto transaction (wallet.wallets.tx_begin_write ());
system.deadline_set (3s);
while (!wallet.wallets.watcher->watched.empty ())
while (0 == wallet.wallets.watcher->size ())
{
ASSERT_NO_ERROR (system.poll ());
}
@ -1179,30 +1184,37 @@ TEST (wallet, work_watcher_cancel)
nano::node_config node_config (24000, system.logging);
node_config.work_watcher_period = 1s;
node_config.max_work_generate_multiplier = 1e6;
node_config.max_work_generate_difficulty = nano::difficulty::from_multiplier (node_config.max_work_generate_multiplier, nano::network_constants::publish_test_threshold);
node_config.enable_voting = false;
auto & node = *system.add_node (node_config);
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
wallet.insert_adhoc (nano::test_genesis_key.prv, false);
nano::keypair key;
auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
auto work1 (node.work_generate_blocking (nano::test_genesis_key.pub));
auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100, work1, false));
uint64_t difficulty1 (0);
nano::work_validate (*block1, &difficulty1);
{
std::unique_lock<std::mutex> lock (node.active.mutex);
//fill multipliers_cb and update active difficulty;
// Prevent active difficulty repopulating multipliers
node.network_params.network.request_interval_ms = 10000;
// Fill multipliers_cb and update active difficulty;
for (auto i (0); i < node.active.multipliers_cb.size (); i++)
{
node.active.multipliers_cb.push_back (node_config.max_work_generate_multiplier);
node.active.multipliers_cb.push_back (node.config.max_work_generate_multiplier);
}
node.active.update_active_difficulty (lock);
}
// Wait for work generation to start
system.deadline_set (5s);
while (node.work.pending.empty ())
while (0 == node.work.size ())
{
ASSERT_NO_ERROR (system.poll ());
}
// Cancel the ongoing work
ASSERT_EQ (1, node.work.size ());
node.work.cancel (block1->root ());
ASSERT_EQ (0, node.work.size ());
{
std::unique_lock<std::mutex> lock (wallet.wallets.watcher->mutex);
auto existing (wallet.wallets.watcher->watched.find (block1->qualified_root ()));

View file

@ -170,7 +170,10 @@ void nano::work_pool::cancel (nano::uint256_union const & root_a)
bool result;
if (item_a.item == root_a)
{
item_a.callback (boost::none);
if (item_a.callback)
{
item_a.callback (boost::none);
}
result = true;
}
else
@ -227,6 +230,12 @@ uint64_t nano::work_pool::generate (nano::uint256_union const & hash_a, uint64_t
return result.value ();
}
size_t nano::work_pool::size ()
{
std::lock_guard<std::mutex> lock (mutex);
return pending.size ();
}
namespace nano
{
std::unique_ptr<seq_con_info_component> collect_seq_con_info (work_pool & work_pool, const std::string & name)

View file

@ -38,6 +38,7 @@ public:
void generate (nano::uint256_union const &, std::function<void(boost::optional<uint64_t> const &)>, uint64_t);
uint64_t generate (nano::uint256_union const &);
uint64_t generate (nano::uint256_union const &, uint64_t);
size_t size ();
nano::network_constants network_constants;
std::atomic<int> ticket;
bool done;

View file

@ -1450,48 +1450,50 @@ void nano::work_watcher::watching (nano::qualified_root const & root_a, std::sha
auto active_difficulty (watcher_l->node.active.limited_active_difficulty ());
if (active_difficulty > difficulty)
{
std::promise<boost::optional<uint64_t>> promise;
std::future<boost::optional<uint64_t>> future = promise.get_future ();
// clang-format off
watcher_l->node.work.generate (root_l, [&promise](boost::optional<uint64_t> work_a) {
promise.set_value (work_a);
},
active_difficulty);
// clang-format on
auto work_l = future.get ();
if (work_l.is_initialized () && !watcher_l->stopped)
{
nano::state_block_builder builder;
std::error_code ec;
std::shared_ptr<nano::state_block> block (builder.from (*block_a).work (*work_l).build (ec));
if (!ec)
watcher_l->node.work.generate (root_l, [watcher_l, block_a, root_a](boost::optional<uint64_t> work_a) {
if (block_a != nullptr && watcher_l != nullptr && !watcher_l->stopped)
{
bool updated_l{ false };
if (work_a.is_initialized ())
{
auto hash (block_a->hash ());
std::lock_guard<std::mutex> active_guard (watcher_l->node.active.mutex);
auto existing (watcher_l->node.active.roots.find (root_a));
if (existing != watcher_l->node.active.roots.end ())
nano::state_block_builder builder;
std::error_code ec;
std::shared_ptr<nano::state_block> block (builder.from (*block_a).work (*work_a).build (ec));
if (!ec)
{
auto election (existing->election);
if (election->status.winner->hash () == hash)
{
election->status.winner = block;
auto hash (block_a->hash ());
std::lock_guard<std::mutex> active_guard (watcher_l->node.active.mutex);
auto existing (watcher_l->node.active.roots.find (root_a));
if (existing != watcher_l->node.active.roots.end ())
{
auto election (existing->election);
if (election->status.winner->hash () == hash)
{
election->status.winner = block;
}
auto current (election->blocks.find (hash));
assert (current != election->blocks.end ());
current->second = block;
}
}
auto current (election->blocks.find (hash));
assert (current != election->blocks.end ());
current->second = block;
watcher_l->node.network.flood_block (block, false);
watcher_l->node.active.update_difficulty (*block.get ());
watcher_l->update (root_a, block);
updated_l = true;
watcher_l->watching (root_a, block);
}
}
watcher_l->node.network.flood_block (block, false);
watcher_l->node.active.update_difficulty (*block.get ());
watcher_l->update (root_a, block);
updated_l = true;
watcher_l->watching (root_a, block);
if (!updated_l)
{
watcher_l->watching (root_a, block_a);
}
}
}
},
active_difficulty);
}
if (!updated_l)
else
{
watcher_l->watching (root_a, block_a);
}
@ -1514,11 +1516,17 @@ void nano::work_watcher::remove (std::shared_ptr<nano::block> block_a)
bool nano::work_watcher::is_watched (nano::qualified_root const & root_a)
{
std::unique_lock<std::mutex> lock (mutex);
std::lock_guard<std::mutex> guard (mutex);
auto exists (watched.find (root_a));
return exists != watched.end ();
}
size_t nano::work_watcher::size ()
{
std::lock_guard<std::mutex> guard (mutex);
return watched.size ();
}
void nano::wallets::do_wallet_actions ()
{
std::unique_lock<std::mutex> action_lock (action_mutex);

View file

@ -172,6 +172,7 @@ public:
void watching (nano::qualified_root const &, std::shared_ptr<nano::state_block>);
void remove (std::shared_ptr<nano::block>);
bool is_watched (nano::qualified_root const &);
size_t size ();
std::mutex mutex;
nano::node & node;
std::unordered_map<nano::qualified_root, std::shared_ptr<nano::state_block>> watched;