From 54d84d0bbc0685a04c459a3640ef2cf341ae77ae Mon Sep 17 00:00:00 2001 From: clemahieu Date: Sat, 6 Jan 2018 12:11:33 -0600 Subject: [PATCH] Doing wallet actions in a single dedicated thread instead of queueing in io_threads. Wallet actions are bound by PoW generation so there really isn't a point to doing them in parallel. Too many changed wallets was causing all io_threads to be consumed and deadlock. This also removes waiting from io_threads which shouldn't block. --- rai/core_test/node.cpp | 4 ++ rai/node/node.cpp | 5 ++- rai/node/node.hpp | 4 +- rai/node/wallet.cpp | 89 +++++++++++++++++++----------------------- rai/node/wallet.hpp | 16 +++++--- rai/qt/qt.cpp | 19 ++++----- 6 files changed, 67 insertions(+), 70 deletions(-) diff --git a/rai/core_test/node.cpp b/rai/core_test/node.cpp index 55c6f91d..c16eeb71 100644 --- a/rai/core_test/node.cpp +++ b/rai/core_test/node.cpp @@ -1146,6 +1146,10 @@ TEST (node, broadcast_elected) //std::cerr << "fork0: " << fork_hash.to_string () << std::endl; //std::cerr << "fork1: " << fork1.hash ().to_string () << std::endl; auto iterations (0); + while (!node0->ledger.block_exists (fork0->hash ()) || !node1->ledger.block_exists (fork0->hash ())) + { + system.poll (); + } while (!node2->ledger.block_exists (fork0->hash ())) { system.poll (); diff --git a/rai/node/node.cpp b/rai/node/node.cpp index 57601e10..a6a320c2 100644 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -1343,8 +1343,8 @@ warmed_up (0), block_processor (*this), block_processor_thread ([this]() { this->block_processor.process_blocks (); }) { - wallets.observer = [this](rai::account const & account_a, bool active) { - observers.wallet (account_a, active); + wallets.observer = [this] (bool active) { + observers.wallet (active); }; peers.peer_observer = [this](rai::endpoint const & endpoint_a) { observers.endpoint (endpoint_a); @@ -1796,6 +1796,7 @@ void rai::node::stop () bootstrap_initiator.stop (); bootstrap.stop (); port_mapping.stop (); + wallets.stop (); if (block_processor_thread.joinable ()) { block_processor_thread.join (); diff --git a/rai/node/node.hpp b/rai/node/node.hpp index 32fecb03..c66ccc60 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -411,7 +411,7 @@ class node_observers { public: rai::observer_set, rai::account const &, rai::amount const &> blocks; - rai::observer_set wallet; + rai::observer_set wallet; rai::observer_set, rai::endpoint const &> vote; rai::observer_set endpoint; rai::observer_set<> disconnect; @@ -515,13 +515,13 @@ public: rai::gap_cache gap_cache; rai::ledger ledger; rai::active_transactions active; - rai::wallets wallets; rai::network network; rai::bootstrap_initiator bootstrap_initiator; rai::bootstrap_listener bootstrap; rai::peer_container peers; boost::filesystem::path application_path; rai::node_observers observers; + rai::wallets wallets; rai::port_mapping port_mapping; rai::vote_processor vote_processor; rai::rep_crawler rep_crawler; diff --git a/rai/node/wallet.cpp b/rai/node/wallet.cpp index f6cf0c67..e7c15992 100644 --- a/rai/node/wallet.cpp +++ b/rai/node/wallet.cpp @@ -809,15 +809,6 @@ void rai::wallet_store::destroy (MDB_txn * transaction_a) assert (status == 0); } -namespace -{ -bool check_ownership (rai::wallets & wallets_a, rai::account const & account_a) -{ - std::lock_guard lock (wallets_a.action_mutex); - return wallets_a.current_actions.find (account_a) == wallets_a.current_actions.end (); -} -} - std::shared_ptr rai::wallet::receive_action (rai::send_block const & send_a, rai::account const & representative_a, rai::uint128_union const & amount_a, bool generate_work_a) { auto hash (send_a.hash ()); @@ -867,7 +858,7 @@ std::shared_ptr rai::wallet::receive_action (rai::send_block const & auto hash (block->hash ()); auto this_l (shared_from_this ()); auto source (send_a.hashables.destination); - node.wallets.queue_wallet_action (source, rai::wallets::generate_priority, [this_l, source, hash] { + node.wallets.queue_wallet_action (rai::wallets::generate_priority, [this_l, source, hash] { this_l->work_generate (source, hash); }); } @@ -907,7 +898,7 @@ std::shared_ptr rai::wallet::change_action (rai::account const & sou { auto hash (block->hash ()); auto this_l (shared_from_this ()); - node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash] { + node.wallets.queue_wallet_action (rai::wallets::generate_priority, [this_l, source_a, hash] { this_l->work_generate (source_a, hash); }); } @@ -951,7 +942,7 @@ std::shared_ptr rai::wallet::send_action (rai::account const & sourc { auto hash (block->hash ()); auto this_l (shared_from_this ()); - node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash] { + node.wallets.queue_wallet_action (rai::wallets::generate_priority, [this_l, source_a, hash] { this_l->work_generate (source_a, hash); }); } @@ -971,8 +962,7 @@ bool rai::wallet::change_sync (rai::account const & source_a, rai::account const void rai::wallet::change_async (rai::account const & source_a, rai::account const & representative_a, std::function)> const & action_a, bool generate_work_a) { - node.wallets.queue_wallet_action (source_a, rai::wallets::high_priority, [this, source_a, representative_a, action_a, generate_work_a]() { - assert (!check_ownership (node.wallets, source_a)); + node.wallets.queue_wallet_action (rai::wallets::high_priority, [this, source_a, representative_a, action_a, generate_work_a]() { auto block (change_action (source_a, representative_a, generate_work_a)); action_a (block); }); @@ -991,8 +981,7 @@ bool rai::wallet::receive_sync (std::shared_ptr block_a, rai::accoun void rai::wallet::receive_async (std::shared_ptr block_a, rai::account const & representative_a, rai::uint128_t const & amount_a, std::function)> const & action_a, bool generate_work_a) { assert (dynamic_cast (block_a.get ()) != nullptr); - node.wallets.queue_wallet_action (static_cast (block_a.get ())->hashables.destination, amount_a, [this, block_a, representative_a, amount_a, action_a, generate_work_a]() { - assert (!check_ownership (node.wallets, static_cast (block_a.get ())->hashables.destination)); + node.wallets.queue_wallet_action (amount_a, [this, block_a, representative_a, amount_a, action_a, generate_work_a]() { auto block (receive_action (*static_cast (block_a.get ()), representative_a, amount_a, generate_work_a)); action_a (block); }); @@ -1011,8 +1000,7 @@ rai::block_hash rai::wallet::send_sync (rai::account const & source_a, rai::acco void rai::wallet::send_async (rai::account const & source_a, rai::account const & account_a, rai::uint128_t const & amount_a, std::function)> const & action_a, bool generate_work_a) { node.background ([this, source_a, account_a, amount_a, action_a, generate_work_a]() { - this->node.wallets.queue_wallet_action (source_a, rai::wallets::high_priority, [this, source_a, account_a, amount_a, action_a, generate_work_a]() { - assert (!check_ownership (node.wallets, source_a)); + this->node.wallets.queue_wallet_action (rai::wallets::high_priority, [this, source_a, account_a, amount_a, action_a, generate_work_a]() { auto block (send_action (source_a, account_a, amount_a, generate_work_a)); action_a (block); }); @@ -1211,8 +1199,10 @@ void rai::wallet::work_generate (rai::account const & account_a, rai::block_hash } rai::wallets::wallets (bool & error_a, rai::node & node_a) : -observer ([](rai::account const &, bool) {}), -node (node_a) +observer ([](bool) {}), +node (node_a), +thread ([this] () { do_wallet_actions (); }), +stopped (false) { if (!error_a) { @@ -1244,6 +1234,12 @@ node (node_a) } } +rai::wallets::~wallets () +{ + stop (); + thread.join (); +} + std::shared_ptr rai::wallets::open (rai::uint256_union const & id_a) { std::shared_ptr result; @@ -1306,46 +1302,34 @@ void rai::wallets::destroy (rai::uint256_union const & id_a) wallet->store.destroy (transaction); } -void rai::wallets::do_wallet_actions (rai::account const & account_a) +void rai::wallets::do_wallet_actions () { - observer (account_a, true); - std::unique_lock lock (node.wallets.action_mutex); - auto existing (node.wallets.pending_actions.find (account_a)); - while (existing != node.wallets.pending_actions.end ()) + std::unique_lock lock (mutex); + while (!stopped) { - auto & entries (existing->second); - if (entries.empty ()) + if (!actions.empty ()) { - node.wallets.pending_actions.erase (existing); - auto erased (node.wallets.current_actions.erase (account_a)); - assert (erased == 1); - (void)erased; + auto first (actions.begin ()); + auto current (std::move (first->second)); + actions.erase (first); + lock.unlock (); + observer (true); + current (); + observer (false); + lock.lock (); } else { - auto first (entries.begin ()); - auto current (std::move (first->second)); - entries.erase (first); - lock.unlock (); - current (); - lock.lock (); + condition.wait (lock); } - existing = node.wallets.pending_actions.find (account_a); } - observer (account_a, false); } -void rai::wallets::queue_wallet_action (rai::account const & account_a, rai::uint128_t const & amount_a, std::function const & action_a) +void rai::wallets::queue_wallet_action (rai::uint128_t const & amount_a, std::function const & action_a) { - std::lock_guard lock (action_mutex); - pending_actions[account_a].insert (decltype (pending_actions)::mapped_type::value_type (amount_a, std::move (action_a))); - if (current_actions.insert (account_a).second) - { - auto node_l (node.shared ()); - node.background ([node_l, account_a]() { - node_l->wallets.do_wallet_actions (account_a); - }); - } + std::lock_guard lock (mutex); + actions.insert (std::make_pair (amount_a, std::move (action_a))); + condition.notify_all (); } void rai::wallets::foreach_representative (MDB_txn * transaction_a, std::function const & action_a) @@ -1389,6 +1373,13 @@ bool rai::wallets::exists (MDB_txn * transaction_a, rai::public_key const & acco return result; } +void rai::wallets::stop () +{ + std::lock_guard lock (mutex); + stopped = true; + condition.notify_all (); +} + rai::uint128_t const rai::wallets::generate_priority = std::numeric_limits::max (); rai::uint128_t const rai::wallets::high_priority = std::numeric_limits::max () - 1; diff --git a/rai/node/wallet.hpp b/rai/node/wallet.hpp index 02820780..40ec70ba 100644 --- a/rai/node/wallet.hpp +++ b/rai/node/wallet.hpp @@ -156,23 +156,27 @@ class wallets { public: wallets (bool &, rai::node &); + ~wallets (); std::shared_ptr open (rai::uint256_union const &); std::shared_ptr create (rai::uint256_union const &); bool search_pending (rai::uint256_union const &); void search_pending_all (); void destroy (rai::uint256_union const &); - void do_wallet_actions (rai::account const &); - void queue_wallet_action (rai::account const &, rai::uint128_t const &, std::function const &); + void do_wallet_actions (); + void queue_wallet_action (rai::uint128_t const &, std::function const &); void foreach_representative (MDB_txn *, std::function const &); bool exists (MDB_txn *, rai::public_key const &); - std::function observer; + void stop (); + std::function observer; std::unordered_map> items; - std::unordered_map, std::greater>> pending_actions; - std::unordered_set current_actions; - std::mutex action_mutex; + std::multimap, std::greater> actions; + std::mutex mutex; + std::condition_variable condition; rai::kdf kdf; MDB_dbi handle; rai::node & node; + std::thread thread; + bool stopped; static rai::uint128_t const generate_priority; static rai::uint128_t const high_priority; }; diff --git a/rai/qt/qt.cpp b/rai/qt/qt.cpp index 708a5920..ed51a92d 100644 --- a/rai/qt/qt.cpp +++ b/rai/qt/qt.cpp @@ -1067,22 +1067,19 @@ void rai_qt::wallet::start () })); } }); - node.observers.wallet.add ([this_w](rai::account const & account_a, bool active_a) { + node.observers.wallet.add ([this_w](bool active_a) { if (auto this_l = this_w.lock ()) { - this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w, account_a, active_a]() { + this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w, active_a]() { if (auto this_l = this_w.lock ()) { - if (this_l->account == account_a) + if (active_a) { - if (active_a) - { - this_l->active_status.insert (rai_qt::status_types::active); - } - else - { - this_l->active_status.erase (rai_qt::status_types::active); - } + this_l->active_status.insert (rai_qt::status_types::active); + } + else + { + this_l->active_status.erase (rai_qt::status_types::active); } } }));