Watch blocks separately in work watcher (#2228)

* Block-wise wallet work watcher

Uses node alarms to be more accurate in the time between block confirmation + re-working
Time is now set after the previous work is complete
Removes work_watcher thread

* Test two blocks

* Erasing confirmed

* Prevent simultaneous opencl work generation

* Update watched with new block, and simplify using blocking work generate

* Handle confirmation with an observer, remove blocks from work generation

Cancelling current and future blocks works consistently now because all work generation gets put into a queue

* Unused mutex

* Handle work cancelling by not updating the block

* Add a test for the work cancelling case

* Update wallet.cpp
This commit is contained in:
Guilherme Lawless 2019-08-27 11:41:50 +01:00 committed by GitHub
commit b796ec601f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 200 additions and 160 deletions

View file

@ -1108,38 +1108,115 @@ TEST (wallet, deterministic_restore)
ASSERT_TRUE (wallet->exists (pub));
}
TEST (wallet, update_work_action)
TEST (wallet, work_watcher_update)
{
nano::system system;
nano::node_config node_config (24000, system.logging);
node_config.enable_voting = false;
node_config.work_watcher_period = 1s;
auto & node = *system.add_node (node_config);
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
auto const block (wallet.send_action (nano::test_genesis_key.pub, key.pub, nano::genesis_amount));
auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
uint64_t difficulty1 (0);
nano::work_validate (*block, &difficulty1);
auto multiplier1 = nano::difficulty::to_multiplier (difficulty1, node.network_params.network.publish_threshold);
system.deadline_set (10s);
auto updated (false);
uint64_t updated_difficulty;
while (!updated)
nano::work_validate (*block1, &difficulty1);
auto const block2 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 200));
uint64_t difficulty2 (0);
nano::work_validate (*block2, &difficulty2);
auto multiplier = nano::difficulty::to_multiplier (std::max (difficulty1, difficulty2), node.network_params.network.publish_threshold);
uint64_t updated_difficulty1{ difficulty1 }, updated_difficulty2{ difficulty2 };
{
std::unique_lock<std::mutex> lock (node.active.mutex);
//fill multipliers_cb and update active difficulty;
for (auto i (0); i < node.active.multipliers_cb.size (); i++)
{
node.active.multipliers_cb.push_back (multiplier1 * (1 + i / 100.));
node.active.multipliers_cb.push_back (multiplier * (1 + i / 100.));
}
node.active.update_active_difficulty (lock);
auto const existing (node.active.roots.find (block->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated = existing->difficulty != difficulty1;
updated_difficulty = existing->difficulty;
lock.unlock ();
}
system.deadline_set (10s);
while (updated_difficulty1 == difficulty1 || updated_difficulty2 == difficulty2)
{
{
auto const existing (node.active.roots.find (block1->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_difficulty1 = existing->difficulty;
}
{
auto const existing (node.active.roots.find (block2->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_difficulty2 = existing->difficulty;
}
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_GT (updated_difficulty, difficulty1);
ASSERT_GT (updated_difficulty1, difficulty1);
ASSERT_GT (updated_difficulty2, difficulty2);
}
TEST (wallet, work_watcher_removed)
{
nano::system system;
nano::node_config node_config (24000, system.logging);
node_config.work_watcher_period = 1s;
auto & node = *system.add_node (node_config);
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
ASSERT_EQ (0, wallet.wallets.watcher->watched.size ());
auto const block (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
ASSERT_EQ (1, wallet.wallets.watcher->watched.size ());
auto transaction (wallet.wallets.tx_begin_write ());
system.deadline_set (3s);
while (!wallet.wallets.watcher->watched.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (wallet, work_watcher_cancel)
{
nano::system system;
nano::node_config node_config (24000, system.logging);
node_config.work_watcher_period = 1s;
node_config.max_work_generate_multiplier = 1e6;
auto & node = *system.add_node (node_config);
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
uint64_t difficulty1 (0);
nano::work_validate (*block1, &difficulty1);
{
std::unique_lock<std::mutex> lock (node.active.mutex);
//fill multipliers_cb and update active difficulty;
for (auto i (0); i < node.active.multipliers_cb.size (); i++)
{
node.active.multipliers_cb.push_back (node_config.max_work_generate_multiplier);
}
node.active.update_active_difficulty (lock);
}
// Wait for work generation to start
system.deadline_set (3s);
while (node.work.pending.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
// Cancel the ongoing work
node.work.cancel (block1->root ());
{
std::unique_lock<std::mutex> lock (wallet.wallets.watcher->mutex);
auto existing (wallet.wallets.watcher->watched.find (block1->qualified_root ()));
ASSERT_NE (wallet.wallets.watcher->watched.end (), existing);
auto block2 (existing->second);
// Block must be the same
ASSERT_NE (nullptr, block1);
ASSERT_NE (nullptr, block2);
ASSERT_EQ (*block1, *block2);
// but should still be under watch
lock.unlock ();
ASSERT_TRUE (wallet.wallets.watcher->is_watched (block1->qualified_root ()));
}
}

View file

@ -800,7 +800,7 @@ void nano::active_transactions::flush_lowest ()
if (count != 2)
{
auto election = it->election;
if (election->confirmation_request_count > high_confirmation_request_count && !election->confirmed && !election->stopped && !node.wallets.watcher.is_watched (it->root))
if (election->confirmation_request_count > high_confirmation_request_count && !election->confirmed && !election->stopped && !node.wallets.watcher->is_watched (it->root))
{
it = decltype (it){ sorted_roots.erase (std::next (it).base ()) };
election->stop ();

View file

@ -342,7 +342,7 @@ void nano::block_processor::process_batch (std::unique_lock<std::mutex> & lock_a
for (auto & i : rollback_list)
{
node.votes_cache.remove (i->hash ());
node.wallets.watcher.remove (i);
node.wallets.watcher->remove (i);
node.active.erase (*i);
}
}
@ -374,7 +374,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
//add block to watcher if desired after block has been added to active
if (watch_work_a)
{
node.wallets.watcher.add (block_a);
node.wallets.watcher->add (block_a);
}
// Announce block contents to the network
node.network.flood_block (block_a, false);

View file

@ -246,7 +246,7 @@ void nano::system::generate_rollback (nano::node & node_a, std::vector<nano::acc
assert (!error);
for (auto & i : rollback_list)
{
node_a.wallets.watcher.remove (i);
node_a.wallets.watcher->remove (i);
node_a.active.erase (*i);
}
}

View file

@ -980,7 +980,7 @@ std::shared_ptr<nano::block> nano::wallet::receive_action (nano::block const & s
wallets.node.logger.try_log (boost::str (boost::format ("Cached or provided work for block %1% account %2% is invalid, regenerating") % block->hash ().to_string () % account.to_account ()));
wallets.node.work_generate_blocking (*block, wallets.node.active.limited_active_difficulty ());
}
wallets.watcher.add (block);
wallets.watcher->add (block);
bool error (wallets.node.process_local (block).code != nano::process_result::progress);
if (!error && generate_work_a)
{
@ -1029,7 +1029,7 @@ std::shared_ptr<nano::block> nano::wallet::change_action (nano::account const &
wallets.node.logger.try_log (boost::str (boost::format ("Cached or provided work for block %1% account %2% is invalid, regenerating") % block->hash ().to_string () % source_a.to_account ()));
wallets.node.work_generate_blocking (*block, wallets.node.active.limited_active_difficulty ());
}
wallets.watcher.add (block);
wallets.watcher->add (block);
bool error (wallets.node.process_local (block).code != nano::process_result::progress);
if (!error && generate_work_a)
{
@ -1143,7 +1143,7 @@ std::shared_ptr<nano::block> nano::wallet::send_action (nano::account const & so
wallets.node.logger.try_log (boost::str (boost::format ("Cached or provided work for block %1% account %2% is invalid, regenerating") % block->hash ().to_string () % account_a.to_account ()));
wallets.node.work_generate_blocking (*block, wallets.node.active.limited_active_difficulty ());
}
wallets.watcher.add (block);
wallets.watcher->add (block);
error = (wallets.node.process_local (block).code != nano::process_result::progress);
if (!error && generate_work_a)
{
@ -1394,11 +1394,11 @@ void nano::wallet::work_cache_blocking (nano::account const & account_a, nano::b
nano::work_watcher::work_watcher (nano::node & node_a) :
node (node_a),
stopped (false),
thread ([this]() {
nano::thread_role::set (nano::thread_role::name::work_watcher);
run (); })
stopped (false)
{
node.observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
this->remove (status_a.winner);
});
}
nano::work_watcher::~work_watcher ()
@ -1409,123 +1409,8 @@ nano::work_watcher::~work_watcher ()
void nano::work_watcher::stop ()
{
std::unique_lock<std::mutex> lock (mutex);
blocks.clear ();
watched.clear ();
stopped = true;
condition.notify_all ();
lock.unlock ();
if (thread.joinable ())
{
thread.join ();
}
}
void nano::work_watcher::run ()
{
std::unique_lock<std::mutex> lock (mutex);
std::chrono::steady_clock::time_point next_attempt;
while (!stopped)
{
next_attempt = std::chrono::steady_clock::now () + node.config.work_watcher_period;
for (auto i (blocks.begin ()), n (blocks.end ()); i != n;)
{
std::unique_lock<std::mutex> active_lock (node.active.mutex);
auto confirmed (false);
auto existing (node.active.roots.find (i->first));
if (node.active.roots.end () != existing)
{
//block may not be in existing yet
confirmed = existing->election->confirmed.load ();
}
else if (i->second == nullptr)
{
// removed
confirmed = true;
}
else
{
//and so we fall back to ledger confirmation
auto transaction (this->node.store.tx_begin_read ());
auto block = this->node.store.block_get (transaction, (i->second)->hash ());
if (block)
{
confirmed = this->node.block_confirmed_or_being_confirmed (transaction, (i->second)->hash ());
}
}
active_lock.unlock ();
if (confirmed)
{
i = blocks.erase (i);
}
else
{
++i;
}
}
for (auto & i : blocks)
{
uint64_t difficulty (0);
nano::block_hash root (0);
if (i.second != nullptr)
{
root = i.second->root ();
nano::work_validate (root, i.second->block_work (), &difficulty);
}
auto active_difficulty (node.active.limited_active_difficulty ());
if (active_difficulty > difficulty && i.second != nullptr)
{
auto qualified_root = i.second->qualified_root ();
auto hash = i.second->hash ();
nano::state_block_builder builder;
std::error_code ec;
builder.from (*i.second);
lock.unlock ();
builder.work (node.work_generate_blocking (root, active_difficulty));
std::shared_ptr<state_block> block (builder.build (ec));
if (!ec)
{
{
std::lock_guard<std::mutex> active_lock (node.active.mutex);
auto existing (node.active.roots.find (qualified_root));
if (existing != node.active.roots.end ())
{
auto election (existing->election);
if (election->status.winner->hash () == hash)
{
election->status.winner = block;
}
auto current (election->blocks.find (hash));
assert (current != election->blocks.end ());
current->second = block;
}
}
node.network.flood_block (block, false);
node.active.update_difficulty (*block.get ());
lock.lock ();
if (stopped)
{
break;
}
if (i.second != nullptr)
{
i.second = block;
}
lock.unlock ();
}
lock.lock ();
if (stopped)
{
break;
}
}
}
if (!stopped)
{
condition.wait_until (lock, next_attempt, [this, &next_attempt]() {
return stopped || next_attempt < std::chrono::steady_clock::now ();
});
}
} // !stopped
}
void nano::work_watcher::add (std::shared_ptr<nano::block> block_a)
@ -1533,27 +1418,105 @@ void nano::work_watcher::add (std::shared_ptr<nano::block> block_a)
auto block_l (std::dynamic_pointer_cast<nano::state_block> (block_a));
if (!stopped && block_l != nullptr)
{
std::lock_guard<std::mutex> lock (mutex);
blocks[block_l->qualified_root ()] = block_l;
auto root_l (block_l->qualified_root ());
std::unique_lock<std::mutex> lock (mutex);
watched[root_l] = block_l;
lock.unlock ();
watching (root_l, block_l);
}
}
void nano::work_watcher::update (nano::qualified_root const & root_a, std::shared_ptr<nano::state_block> block_a)
{
std::lock_guard<std::mutex> guard (mutex);
watched[root_a] = block_a;
}
void nano::work_watcher::watching (nano::qualified_root const & root_a, std::shared_ptr<nano::state_block> block_a)
{
std::weak_ptr<nano::work_watcher> watcher_w (shared_from_this ());
node.alarm.add (std::chrono::steady_clock::now () + node.config.work_watcher_period, [block_a, root_a, watcher_w]() {
auto watcher_l = watcher_w.lock ();
if (watcher_l && !watcher_l->stopped && block_a != nullptr)
{
std::unique_lock<std::mutex> lock (watcher_l->mutex);
if (watcher_l->watched.find (root_a) != watcher_l->watched.end ()) // not yet confirmed or cancelled
{
lock.unlock ();
bool updated_l{ false };
uint64_t difficulty (0);
auto root_l (block_a->root ());
nano::work_validate (root_l, block_a->block_work (), &difficulty);
auto active_difficulty (watcher_l->node.active.limited_active_difficulty ());
if (active_difficulty > difficulty)
{
std::promise<boost::optional<uint64_t>> promise;
std::future<boost::optional<uint64_t>> future = promise.get_future ();
// clang-format off
watcher_l->node.work.generate (root_l, [&promise](boost::optional<uint64_t> work_a) {
promise.set_value (work_a);
},
active_difficulty);
// clang-format on
auto work_l = future.get ();
if (work_l.is_initialized () && !watcher_l->stopped)
{
nano::state_block_builder builder;
std::error_code ec;
std::shared_ptr<nano::state_block> block (builder.from (*block_a).work (*work_l).build (ec));
if (!ec)
{
{
auto hash (block_a->hash ());
std::lock_guard<std::mutex> active_guard (watcher_l->node.active.mutex);
auto existing (watcher_l->node.active.roots.find (root_a));
if (existing != watcher_l->node.active.roots.end ())
{
auto election (existing->election);
if (election->status.winner->hash () == hash)
{
election->status.winner = block;
}
auto current (election->blocks.find (hash));
assert (current != election->blocks.end ());
current->second = block;
}
}
watcher_l->node.network.flood_block (block, false);
watcher_l->node.active.update_difficulty (*block.get ());
watcher_l->update (root_a, block);
updated_l = true;
watcher_l->watching (root_a, block);
}
}
}
if (!updated_l)
{
watcher_l->watching (root_a, block_a);
}
}
}
});
}
void nano::work_watcher::remove (std::shared_ptr<nano::block> block_a)
{
auto root (block_a->qualified_root ());
auto root_l (block_a->qualified_root ());
std::lock_guard<std::mutex> lock (mutex);
auto existing (blocks.find (root));
if (existing != blocks.end () && existing->second->hash () == block_a->hash ())
auto existing (watched.find (root_l));
if (existing != watched.end () && existing->second->hash () == block_a->hash ())
{
existing->second = nullptr;
watched.erase (existing);
node.work.cancel (block_a->root ());
}
}
bool nano::work_watcher::is_watched (nano::qualified_root const & root_a)
{
std::unique_lock<std::mutex> lock (mutex);
auto exists (blocks.find (root_a));
return exists != blocks.end ();
auto exists (watched.find (root_a));
return exists != watched.end ();
}
void nano::wallets::do_wallet_actions ()
@ -1588,7 +1551,7 @@ observer ([](bool) {}),
node (node_a),
env (boost::polymorphic_downcast<nano::mdb_wallets_store *> (node_a.wallets_store_impl.get ())->environment),
stopped (false),
watcher (node_a),
watcher (std::make_shared<nano::work_watcher> (node_a)),
thread ([this]() {
nano::thread_role::set (nano::thread_role::name::wallet_actions);
do_wallet_actions ();
@ -1844,7 +1807,7 @@ void nano::wallets::stop ()
{
thread.join ();
}
watcher.stop ();
watcher->stop ();
}
nano::write_transaction nano::wallets::tx_begin_write ()

View file

@ -161,22 +161,21 @@ public:
std::unordered_set<nano::account> representatives;
};
class work_watcher
class work_watcher final : public std::enable_shared_from_this<nano::work_watcher>
{
public:
work_watcher (nano::node &);
~work_watcher ();
void stop ();
void run ();
void add (std::shared_ptr<nano::block>);
void update (nano::qualified_root const &, std::shared_ptr<nano::state_block>);
void watching (nano::qualified_root const &, std::shared_ptr<nano::state_block>);
void remove (std::shared_ptr<nano::block>);
bool is_watched (nano::qualified_root const &);
std::mutex mutex;
nano::node & node;
std::condition_variable condition;
std::unordered_map<nano::qualified_root, std::shared_ptr<nano::state_block>> watched;
std::atomic<bool> stopped;
std::unordered_map<nano::qualified_root, std::shared_ptr<nano::state_block>> blocks;
std::thread thread;
};
/**
* The wallets set is all the wallets a node controls.
@ -217,7 +216,7 @@ public:
nano::node & node;
nano::mdb_env & env;
std::atomic<bool> stopped;
nano::work_watcher watcher;
std::shared_ptr<nano::work_watcher> watcher;
boost::thread thread;
static nano::uint128_t const generate_priority;
static nano::uint128_t const high_priority;

View file

@ -1675,6 +1675,7 @@ TEST (rpc, process_block_with_work_watcher)
nano::system system;
nano::node_config node_config (24000, system.logging);
node_config.enable_voting = false;
node_config.work_watcher_period = 1s;
auto & node1 = *system.add_node (node_config);
nano::keypair key;
auto latest (system.nodes[0]->latest (nano::test_genesis_key.pub));

View file

@ -30,7 +30,7 @@ TEST (system, generate_mass_activity_long)
nano::node_config node_config (24000, system.logging);
node_config.enable_voting = false; // Prevent blocks cementing
auto node = system.add_node (node_config);
system.wallet (0)->wallets.watcher.stop (); // Stop work watcher
system.wallet (0)->wallets.watcher->stop (); // Stop work watcher
nano::thread_runner runner (system.io_ctx, system.nodes[0]->config.io_threads);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
uint32_t count (1000000000);