From b796ec601f74df380392ebba0be28e733c26586c Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Tue, 27 Aug 2019 11:41:50 +0100 Subject: [PATCH] Watch blocks separately in work watcher (#2228) * Block-wise wallet work watcher Uses node alarms to be more accurate in the time between block confirmation + re-working Time is now set after the previous work is complete Removes work_watcher thread * Test two blocks * Erasing confirmed * Prevent simultaneous opencl work generation * Update watched with new block, and simplify using blocking work generate * Handle confirmation with an observer, remove blocks from work generation Cancelling current and future blocks works consistently now because all work generation gets put into a queue * Unused mutex * Handle work cancelling by not updating the block * Add a test for the work cancelling case * Update wallet.cpp --- nano/core_test/wallet.cpp | 109 +++++++++++--- nano/node/active_transactions.cpp | 2 +- nano/node/blockprocessor.cpp | 4 +- nano/node/testing.cpp | 2 +- nano/node/wallet.cpp | 229 +++++++++++++----------------- nano/node/wallet.hpp | 11 +- nano/rpc_test/rpc.cpp | 1 + nano/slow_test/node.cpp | 2 +- 8 files changed, 200 insertions(+), 160 deletions(-) diff --git a/nano/core_test/wallet.cpp b/nano/core_test/wallet.cpp index 8d020ad3..9704c695 100644 --- a/nano/core_test/wallet.cpp +++ b/nano/core_test/wallet.cpp @@ -1108,38 +1108,115 @@ TEST (wallet, deterministic_restore) ASSERT_TRUE (wallet->exists (pub)); } -TEST (wallet, update_work_action) +TEST (wallet, work_watcher_update) { nano::system system; nano::node_config node_config (24000, system.logging); node_config.enable_voting = false; + node_config.work_watcher_period = 1s; auto & node = *system.add_node (node_config); auto & wallet (*system.wallet (0)); wallet.insert_adhoc (nano::test_genesis_key.prv); nano::keypair key; - auto const block (wallet.send_action (nano::test_genesis_key.pub, key.pub, nano::genesis_amount)); + auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100)); uint64_t difficulty1 (0); - nano::work_validate (*block, &difficulty1); - auto multiplier1 = nano::difficulty::to_multiplier (difficulty1, node.network_params.network.publish_threshold); - system.deadline_set (10s); - auto updated (false); - uint64_t updated_difficulty; - while (!updated) + nano::work_validate (*block1, &difficulty1); + auto const block2 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 200)); + uint64_t difficulty2 (0); + nano::work_validate (*block2, &difficulty2); + auto multiplier = nano::difficulty::to_multiplier (std::max (difficulty1, difficulty2), node.network_params.network.publish_threshold); + uint64_t updated_difficulty1{ difficulty1 }, updated_difficulty2{ difficulty2 }; { std::unique_lock lock (node.active.mutex); //fill multipliers_cb and update active difficulty; for (auto i (0); i < node.active.multipliers_cb.size (); i++) { - node.active.multipliers_cb.push_back (multiplier1 * (1 + i / 100.)); + node.active.multipliers_cb.push_back (multiplier * (1 + i / 100.)); } node.active.update_active_difficulty (lock); - auto const existing (node.active.roots.find (block->qualified_root ())); - //if existing is junk the block has been confirmed already - ASSERT_NE (existing, node.active.roots.end ()); - updated = existing->difficulty != difficulty1; - updated_difficulty = existing->difficulty; - lock.unlock (); + } + system.deadline_set (10s); + while (updated_difficulty1 == difficulty1 || updated_difficulty2 == difficulty2) + { + { + auto const existing (node.active.roots.find (block1->qualified_root ())); + //if existing is junk the block has been confirmed already + ASSERT_NE (existing, node.active.roots.end ()); + updated_difficulty1 = existing->difficulty; + } + { + auto const existing (node.active.roots.find (block2->qualified_root ())); + //if existing is junk the block has been confirmed already + ASSERT_NE (existing, node.active.roots.end ()); + updated_difficulty2 = existing->difficulty; + } ASSERT_NO_ERROR (system.poll ()); } - ASSERT_GT (updated_difficulty, difficulty1); + ASSERT_GT (updated_difficulty1, difficulty1); + ASSERT_GT (updated_difficulty2, difficulty2); +} + +TEST (wallet, work_watcher_removed) +{ + nano::system system; + nano::node_config node_config (24000, system.logging); + node_config.work_watcher_period = 1s; + auto & node = *system.add_node (node_config); + auto & wallet (*system.wallet (0)); + wallet.insert_adhoc (nano::test_genesis_key.prv); + nano::keypair key; + ASSERT_EQ (0, wallet.wallets.watcher->watched.size ()); + auto const block (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100)); + ASSERT_EQ (1, wallet.wallets.watcher->watched.size ()); + auto transaction (wallet.wallets.tx_begin_write ()); + system.deadline_set (3s); + while (!wallet.wallets.watcher->watched.empty ()) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + +TEST (wallet, work_watcher_cancel) +{ + nano::system system; + nano::node_config node_config (24000, system.logging); + node_config.work_watcher_period = 1s; + node_config.max_work_generate_multiplier = 1e6; + auto & node = *system.add_node (node_config); + auto & wallet (*system.wallet (0)); + wallet.insert_adhoc (nano::test_genesis_key.prv); + nano::keypair key; + auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100)); + uint64_t difficulty1 (0); + nano::work_validate (*block1, &difficulty1); + { + std::unique_lock lock (node.active.mutex); + //fill multipliers_cb and update active difficulty; + for (auto i (0); i < node.active.multipliers_cb.size (); i++) + { + node.active.multipliers_cb.push_back (node_config.max_work_generate_multiplier); + } + node.active.update_active_difficulty (lock); + } + // Wait for work generation to start + system.deadline_set (3s); + while (node.work.pending.empty ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + // Cancel the ongoing work + node.work.cancel (block1->root ()); + { + std::unique_lock lock (wallet.wallets.watcher->mutex); + auto existing (wallet.wallets.watcher->watched.find (block1->qualified_root ())); + ASSERT_NE (wallet.wallets.watcher->watched.end (), existing); + auto block2 (existing->second); + // Block must be the same + ASSERT_NE (nullptr, block1); + ASSERT_NE (nullptr, block2); + ASSERT_EQ (*block1, *block2); + // but should still be under watch + lock.unlock (); + ASSERT_TRUE (wallet.wallets.watcher->is_watched (block1->qualified_root ())); + } } diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index b25c58a0..b209b901 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -800,7 +800,7 @@ void nano::active_transactions::flush_lowest () if (count != 2) { auto election = it->election; - if (election->confirmation_request_count > high_confirmation_request_count && !election->confirmed && !election->stopped && !node.wallets.watcher.is_watched (it->root)) + if (election->confirmation_request_count > high_confirmation_request_count && !election->confirmed && !election->stopped && !node.wallets.watcher->is_watched (it->root)) { it = decltype (it){ sorted_roots.erase (std::next (it).base ()) }; election->stop (); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index e109c0e8..a6389691 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -342,7 +342,7 @@ void nano::block_processor::process_batch (std::unique_lock & lock_a for (auto & i : rollback_list) { node.votes_cache.remove (i->hash ()); - node.wallets.watcher.remove (i); + node.wallets.watcher->remove (i); node.active.erase (*i); } } @@ -374,7 +374,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std:: //add block to watcher if desired after block has been added to active if (watch_work_a) { - node.wallets.watcher.add (block_a); + node.wallets.watcher->add (block_a); } // Announce block contents to the network node.network.flood_block (block_a, false); diff --git a/nano/node/testing.cpp b/nano/node/testing.cpp index 37563aba..273a5814 100644 --- a/nano/node/testing.cpp +++ b/nano/node/testing.cpp @@ -246,7 +246,7 @@ void nano::system::generate_rollback (nano::node & node_a, std::vectorremove (i); node_a.active.erase (*i); } } diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index 231e052d..4a21bcb3 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -980,7 +980,7 @@ std::shared_ptr nano::wallet::receive_action (nano::block const & s wallets.node.logger.try_log (boost::str (boost::format ("Cached or provided work for block %1% account %2% is invalid, regenerating") % block->hash ().to_string () % account.to_account ())); wallets.node.work_generate_blocking (*block, wallets.node.active.limited_active_difficulty ()); } - wallets.watcher.add (block); + wallets.watcher->add (block); bool error (wallets.node.process_local (block).code != nano::process_result::progress); if (!error && generate_work_a) { @@ -1029,7 +1029,7 @@ std::shared_ptr nano::wallet::change_action (nano::account const & wallets.node.logger.try_log (boost::str (boost::format ("Cached or provided work for block %1% account %2% is invalid, regenerating") % block->hash ().to_string () % source_a.to_account ())); wallets.node.work_generate_blocking (*block, wallets.node.active.limited_active_difficulty ()); } - wallets.watcher.add (block); + wallets.watcher->add (block); bool error (wallets.node.process_local (block).code != nano::process_result::progress); if (!error && generate_work_a) { @@ -1143,7 +1143,7 @@ std::shared_ptr nano::wallet::send_action (nano::account const & so wallets.node.logger.try_log (boost::str (boost::format ("Cached or provided work for block %1% account %2% is invalid, regenerating") % block->hash ().to_string () % account_a.to_account ())); wallets.node.work_generate_blocking (*block, wallets.node.active.limited_active_difficulty ()); } - wallets.watcher.add (block); + wallets.watcher->add (block); error = (wallets.node.process_local (block).code != nano::process_result::progress); if (!error && generate_work_a) { @@ -1394,11 +1394,11 @@ void nano::wallet::work_cache_blocking (nano::account const & account_a, nano::b nano::work_watcher::work_watcher (nano::node & node_a) : node (node_a), -stopped (false), -thread ([this]() { - nano::thread_role::set (nano::thread_role::name::work_watcher); - run (); }) +stopped (false) { + node.observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) { + this->remove (status_a.winner); + }); } nano::work_watcher::~work_watcher () @@ -1409,123 +1409,8 @@ nano::work_watcher::~work_watcher () void nano::work_watcher::stop () { std::unique_lock lock (mutex); - blocks.clear (); + watched.clear (); stopped = true; - condition.notify_all (); - lock.unlock (); - if (thread.joinable ()) - { - thread.join (); - } -} -void nano::work_watcher::run () -{ - std::unique_lock lock (mutex); - std::chrono::steady_clock::time_point next_attempt; - while (!stopped) - { - next_attempt = std::chrono::steady_clock::now () + node.config.work_watcher_period; - for (auto i (blocks.begin ()), n (blocks.end ()); i != n;) - { - std::unique_lock active_lock (node.active.mutex); - auto confirmed (false); - auto existing (node.active.roots.find (i->first)); - if (node.active.roots.end () != existing) - { - //block may not be in existing yet - confirmed = existing->election->confirmed.load (); - } - else if (i->second == nullptr) - { - // removed - confirmed = true; - } - else - { - //and so we fall back to ledger confirmation - auto transaction (this->node.store.tx_begin_read ()); - auto block = this->node.store.block_get (transaction, (i->second)->hash ()); - if (block) - { - confirmed = this->node.block_confirmed_or_being_confirmed (transaction, (i->second)->hash ()); - } - } - active_lock.unlock (); - - if (confirmed) - { - i = blocks.erase (i); - } - else - { - ++i; - } - } - for (auto & i : blocks) - { - uint64_t difficulty (0); - nano::block_hash root (0); - if (i.second != nullptr) - { - root = i.second->root (); - nano::work_validate (root, i.second->block_work (), &difficulty); - } - auto active_difficulty (node.active.limited_active_difficulty ()); - if (active_difficulty > difficulty && i.second != nullptr) - { - auto qualified_root = i.second->qualified_root (); - auto hash = i.second->hash (); - nano::state_block_builder builder; - std::error_code ec; - builder.from (*i.second); - lock.unlock (); - builder.work (node.work_generate_blocking (root, active_difficulty)); - std::shared_ptr block (builder.build (ec)); - if (!ec) - { - { - std::lock_guard active_lock (node.active.mutex); - auto existing (node.active.roots.find (qualified_root)); - if (existing != node.active.roots.end ()) - { - auto election (existing->election); - if (election->status.winner->hash () == hash) - { - election->status.winner = block; - } - auto current (election->blocks.find (hash)); - assert (current != election->blocks.end ()); - current->second = block; - } - } - node.network.flood_block (block, false); - node.active.update_difficulty (*block.get ()); - lock.lock (); - if (stopped) - { - break; - } - if (i.second != nullptr) - { - i.second = block; - } - lock.unlock (); - } - lock.lock (); - if (stopped) - { - break; - } - } - } - - if (!stopped) - { - condition.wait_until (lock, next_attempt, [this, &next_attempt]() { - return stopped || next_attempt < std::chrono::steady_clock::now (); - }); - } - } // !stopped } void nano::work_watcher::add (std::shared_ptr block_a) @@ -1533,27 +1418,105 @@ void nano::work_watcher::add (std::shared_ptr block_a) auto block_l (std::dynamic_pointer_cast (block_a)); if (!stopped && block_l != nullptr) { - std::lock_guard lock (mutex); - blocks[block_l->qualified_root ()] = block_l; + auto root_l (block_l->qualified_root ()); + std::unique_lock lock (mutex); + watched[root_l] = block_l; + lock.unlock (); + watching (root_l, block_l); } } +void nano::work_watcher::update (nano::qualified_root const & root_a, std::shared_ptr block_a) +{ + std::lock_guard guard (mutex); + watched[root_a] = block_a; +} + +void nano::work_watcher::watching (nano::qualified_root const & root_a, std::shared_ptr block_a) +{ + std::weak_ptr watcher_w (shared_from_this ()); + node.alarm.add (std::chrono::steady_clock::now () + node.config.work_watcher_period, [block_a, root_a, watcher_w]() { + auto watcher_l = watcher_w.lock (); + if (watcher_l && !watcher_l->stopped && block_a != nullptr) + { + std::unique_lock lock (watcher_l->mutex); + if (watcher_l->watched.find (root_a) != watcher_l->watched.end ()) // not yet confirmed or cancelled + { + lock.unlock (); + bool updated_l{ false }; + uint64_t difficulty (0); + auto root_l (block_a->root ()); + nano::work_validate (root_l, block_a->block_work (), &difficulty); + auto active_difficulty (watcher_l->node.active.limited_active_difficulty ()); + if (active_difficulty > difficulty) + { + std::promise> promise; + std::future> future = promise.get_future (); + // clang-format off + watcher_l->node.work.generate (root_l, [&promise](boost::optional work_a) { + promise.set_value (work_a); + }, + active_difficulty); + // clang-format on + auto work_l = future.get (); + if (work_l.is_initialized () && !watcher_l->stopped) + { + nano::state_block_builder builder; + std::error_code ec; + std::shared_ptr block (builder.from (*block_a).work (*work_l).build (ec)); + + if (!ec) + { + { + auto hash (block_a->hash ()); + std::lock_guard active_guard (watcher_l->node.active.mutex); + auto existing (watcher_l->node.active.roots.find (root_a)); + if (existing != watcher_l->node.active.roots.end ()) + { + auto election (existing->election); + if (election->status.winner->hash () == hash) + { + election->status.winner = block; + } + auto current (election->blocks.find (hash)); + assert (current != election->blocks.end ()); + current->second = block; + } + } + watcher_l->node.network.flood_block (block, false); + watcher_l->node.active.update_difficulty (*block.get ()); + watcher_l->update (root_a, block); + updated_l = true; + watcher_l->watching (root_a, block); + } + } + } + if (!updated_l) + { + watcher_l->watching (root_a, block_a); + } + } + } + }); +} + void nano::work_watcher::remove (std::shared_ptr block_a) { - auto root (block_a->qualified_root ()); + auto root_l (block_a->qualified_root ()); std::lock_guard lock (mutex); - auto existing (blocks.find (root)); - if (existing != blocks.end () && existing->second->hash () == block_a->hash ()) + auto existing (watched.find (root_l)); + if (existing != watched.end () && existing->second->hash () == block_a->hash ()) { - existing->second = nullptr; + watched.erase (existing); + node.work.cancel (block_a->root ()); } } bool nano::work_watcher::is_watched (nano::qualified_root const & root_a) { std::unique_lock lock (mutex); - auto exists (blocks.find (root_a)); - return exists != blocks.end (); + auto exists (watched.find (root_a)); + return exists != watched.end (); } void nano::wallets::do_wallet_actions () @@ -1588,7 +1551,7 @@ observer ([](bool) {}), node (node_a), env (boost::polymorphic_downcast (node_a.wallets_store_impl.get ())->environment), stopped (false), -watcher (node_a), +watcher (std::make_shared (node_a)), thread ([this]() { nano::thread_role::set (nano::thread_role::name::wallet_actions); do_wallet_actions (); @@ -1844,7 +1807,7 @@ void nano::wallets::stop () { thread.join (); } - watcher.stop (); + watcher->stop (); } nano::write_transaction nano::wallets::tx_begin_write () diff --git a/nano/node/wallet.hpp b/nano/node/wallet.hpp index bda36377..ded64897 100644 --- a/nano/node/wallet.hpp +++ b/nano/node/wallet.hpp @@ -161,22 +161,21 @@ public: std::unordered_set representatives; }; -class work_watcher +class work_watcher final : public std::enable_shared_from_this { public: work_watcher (nano::node &); ~work_watcher (); void stop (); - void run (); void add (std::shared_ptr); + void update (nano::qualified_root const &, std::shared_ptr); + void watching (nano::qualified_root const &, std::shared_ptr); void remove (std::shared_ptr); bool is_watched (nano::qualified_root const &); std::mutex mutex; nano::node & node; - std::condition_variable condition; + std::unordered_map> watched; std::atomic stopped; - std::unordered_map> blocks; - std::thread thread; }; /** * The wallets set is all the wallets a node controls. @@ -217,7 +216,7 @@ public: nano::node & node; nano::mdb_env & env; std::atomic stopped; - nano::work_watcher watcher; + std::shared_ptr watcher; boost::thread thread; static nano::uint128_t const generate_priority; static nano::uint128_t const high_priority; diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 8090ae3c..0a1f0ccc 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -1675,6 +1675,7 @@ TEST (rpc, process_block_with_work_watcher) nano::system system; nano::node_config node_config (24000, system.logging); node_config.enable_voting = false; + node_config.work_watcher_period = 1s; auto & node1 = *system.add_node (node_config); nano::keypair key; auto latest (system.nodes[0]->latest (nano::test_genesis_key.pub)); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 424dbcf7..15032d01 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -30,7 +30,7 @@ TEST (system, generate_mass_activity_long) nano::node_config node_config (24000, system.logging); node_config.enable_voting = false; // Prevent blocks cementing auto node = system.add_node (node_config); - system.wallet (0)->wallets.watcher.stop (); // Stop work watcher + system.wallet (0)->wallets.watcher->stop (); // Stop work watcher nano::thread_runner runner (system.io_ctx, system.nodes[0]->config.io_threads); system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); uint32_t count (1000000000);