Doing wallet actions in a single dedicated thread instead of queueing in io_threads.

Wallet actions are bound by PoW generation so there really isn't a point to doing them in parallel.
Too many changed wallets was causing all io_threads to be consumed and deadlock.
This also removes waiting from io_threads which shouldn't block.
This commit is contained in:
clemahieu 2018-01-06 12:11:33 -06:00
commit 54d84d0bbc
6 changed files with 67 additions and 70 deletions

View file

@ -1146,6 +1146,10 @@ TEST (node, broadcast_elected)
//std::cerr << "fork0: " << fork_hash.to_string () << std::endl;
//std::cerr << "fork1: " << fork1.hash ().to_string () << std::endl;
auto iterations (0);
while (!node0->ledger.block_exists (fork0->hash ()) || !node1->ledger.block_exists (fork0->hash ()))
{
system.poll ();
}
while (!node2->ledger.block_exists (fork0->hash ()))
{
system.poll ();

View file

@ -1343,8 +1343,8 @@ warmed_up (0),
block_processor (*this),
block_processor_thread ([this]() { this->block_processor.process_blocks (); })
{
wallets.observer = [this](rai::account const & account_a, bool active) {
observers.wallet (account_a, active);
wallets.observer = [this] (bool active) {
observers.wallet (active);
};
peers.peer_observer = [this](rai::endpoint const & endpoint_a) {
observers.endpoint (endpoint_a);
@ -1796,6 +1796,7 @@ void rai::node::stop ()
bootstrap_initiator.stop ();
bootstrap.stop ();
port_mapping.stop ();
wallets.stop ();
if (block_processor_thread.joinable ())
{
block_processor_thread.join ();

View file

@ -411,7 +411,7 @@ class node_observers
{
public:
rai::observer_set<std::shared_ptr<rai::block>, rai::account const &, rai::amount const &> blocks;
rai::observer_set<rai::account const &, bool> wallet;
rai::observer_set<bool> wallet;
rai::observer_set<std::shared_ptr<rai::vote>, rai::endpoint const &> vote;
rai::observer_set<rai::endpoint const &> endpoint;
rai::observer_set<> disconnect;
@ -515,13 +515,13 @@ public:
rai::gap_cache gap_cache;
rai::ledger ledger;
rai::active_transactions active;
rai::wallets wallets;
rai::network network;
rai::bootstrap_initiator bootstrap_initiator;
rai::bootstrap_listener bootstrap;
rai::peer_container peers;
boost::filesystem::path application_path;
rai::node_observers observers;
rai::wallets wallets;
rai::port_mapping port_mapping;
rai::vote_processor vote_processor;
rai::rep_crawler rep_crawler;

View file

@ -809,15 +809,6 @@ void rai::wallet_store::destroy (MDB_txn * transaction_a)
assert (status == 0);
}
namespace
{
bool check_ownership (rai::wallets & wallets_a, rai::account const & account_a)
{
std::lock_guard<std::mutex> lock (wallets_a.action_mutex);
return wallets_a.current_actions.find (account_a) == wallets_a.current_actions.end ();
}
}
std::shared_ptr<rai::block> rai::wallet::receive_action (rai::send_block const & send_a, rai::account const & representative_a, rai::uint128_union const & amount_a, bool generate_work_a)
{
auto hash (send_a.hash ());
@ -867,7 +858,7 @@ std::shared_ptr<rai::block> rai::wallet::receive_action (rai::send_block const &
auto hash (block->hash ());
auto this_l (shared_from_this ());
auto source (send_a.hashables.destination);
node.wallets.queue_wallet_action (source, rai::wallets::generate_priority, [this_l, source, hash] {
node.wallets.queue_wallet_action (rai::wallets::generate_priority, [this_l, source, hash] {
this_l->work_generate (source, hash);
});
}
@ -907,7 +898,7 @@ std::shared_ptr<rai::block> rai::wallet::change_action (rai::account const & sou
{
auto hash (block->hash ());
auto this_l (shared_from_this ());
node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash] {
node.wallets.queue_wallet_action (rai::wallets::generate_priority, [this_l, source_a, hash] {
this_l->work_generate (source_a, hash);
});
}
@ -951,7 +942,7 @@ std::shared_ptr<rai::block> rai::wallet::send_action (rai::account const & sourc
{
auto hash (block->hash ());
auto this_l (shared_from_this ());
node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash] {
node.wallets.queue_wallet_action (rai::wallets::generate_priority, [this_l, source_a, hash] {
this_l->work_generate (source_a, hash);
});
}
@ -971,8 +962,7 @@ bool rai::wallet::change_sync (rai::account const & source_a, rai::account const
void rai::wallet::change_async (rai::account const & source_a, rai::account const & representative_a, std::function<void(std::shared_ptr<rai::block>)> const & action_a, bool generate_work_a)
{
node.wallets.queue_wallet_action (source_a, rai::wallets::high_priority, [this, source_a, representative_a, action_a, generate_work_a]() {
assert (!check_ownership (node.wallets, source_a));
node.wallets.queue_wallet_action (rai::wallets::high_priority, [this, source_a, representative_a, action_a, generate_work_a]() {
auto block (change_action (source_a, representative_a, generate_work_a));
action_a (block);
});
@ -991,8 +981,7 @@ bool rai::wallet::receive_sync (std::shared_ptr<rai::block> block_a, rai::accoun
void rai::wallet::receive_async (std::shared_ptr<rai::block> block_a, rai::account const & representative_a, rai::uint128_t const & amount_a, std::function<void(std::shared_ptr<rai::block>)> const & action_a, bool generate_work_a)
{
assert (dynamic_cast<rai::send_block *> (block_a.get ()) != nullptr);
node.wallets.queue_wallet_action (static_cast<rai::send_block *> (block_a.get ())->hashables.destination, amount_a, [this, block_a, representative_a, amount_a, action_a, generate_work_a]() {
assert (!check_ownership (node.wallets, static_cast<rai::send_block *> (block_a.get ())->hashables.destination));
node.wallets.queue_wallet_action (amount_a, [this, block_a, representative_a, amount_a, action_a, generate_work_a]() {
auto block (receive_action (*static_cast<rai::send_block *> (block_a.get ()), representative_a, amount_a, generate_work_a));
action_a (block);
});
@ -1011,8 +1000,7 @@ rai::block_hash rai::wallet::send_sync (rai::account const & source_a, rai::acco
void rai::wallet::send_async (rai::account const & source_a, rai::account const & account_a, rai::uint128_t const & amount_a, std::function<void(std::shared_ptr<rai::block>)> const & action_a, bool generate_work_a)
{
node.background ([this, source_a, account_a, amount_a, action_a, generate_work_a]() {
this->node.wallets.queue_wallet_action (source_a, rai::wallets::high_priority, [this, source_a, account_a, amount_a, action_a, generate_work_a]() {
assert (!check_ownership (node.wallets, source_a));
this->node.wallets.queue_wallet_action (rai::wallets::high_priority, [this, source_a, account_a, amount_a, action_a, generate_work_a]() {
auto block (send_action (source_a, account_a, amount_a, generate_work_a));
action_a (block);
});
@ -1211,8 +1199,10 @@ void rai::wallet::work_generate (rai::account const & account_a, rai::block_hash
}
rai::wallets::wallets (bool & error_a, rai::node & node_a) :
observer ([](rai::account const &, bool) {}),
node (node_a)
observer ([](bool) {}),
node (node_a),
thread ([this] () { do_wallet_actions (); }),
stopped (false)
{
if (!error_a)
{
@ -1244,6 +1234,12 @@ node (node_a)
}
}
rai::wallets::~wallets ()
{
stop ();
thread.join ();
}
std::shared_ptr<rai::wallet> rai::wallets::open (rai::uint256_union const & id_a)
{
std::shared_ptr<rai::wallet> result;
@ -1306,46 +1302,34 @@ void rai::wallets::destroy (rai::uint256_union const & id_a)
wallet->store.destroy (transaction);
}
void rai::wallets::do_wallet_actions (rai::account const & account_a)
void rai::wallets::do_wallet_actions ()
{
observer (account_a, true);
std::unique_lock<std::mutex> lock (node.wallets.action_mutex);
auto existing (node.wallets.pending_actions.find (account_a));
while (existing != node.wallets.pending_actions.end ())
std::unique_lock <std::mutex> lock (mutex);
while (!stopped)
{
auto & entries (existing->second);
if (entries.empty ())
if (!actions.empty ())
{
node.wallets.pending_actions.erase (existing);
auto erased (node.wallets.current_actions.erase (account_a));
assert (erased == 1);
(void)erased;
auto first (actions.begin ());
auto current (std::move (first->second));
actions.erase (first);
lock.unlock ();
observer (true);
current ();
observer (false);
lock.lock ();
}
else
{
auto first (entries.begin ());
auto current (std::move (first->second));
entries.erase (first);
lock.unlock ();
current ();
lock.lock ();
condition.wait (lock);
}
existing = node.wallets.pending_actions.find (account_a);
}
observer (account_a, false);
}
void rai::wallets::queue_wallet_action (rai::account const & account_a, rai::uint128_t const & amount_a, std::function<void()> const & action_a)
void rai::wallets::queue_wallet_action (rai::uint128_t const & amount_a, std::function<void()> const & action_a)
{
std::lock_guard<std::mutex> lock (action_mutex);
pending_actions[account_a].insert (decltype (pending_actions)::mapped_type::value_type (amount_a, std::move (action_a)));
if (current_actions.insert (account_a).second)
{
auto node_l (node.shared ());
node.background ([node_l, account_a]() {
node_l->wallets.do_wallet_actions (account_a);
});
}
std::lock_guard <std::mutex> lock (mutex);
actions.insert (std::make_pair (amount_a, std::move (action_a)));
condition.notify_all ();
}
void rai::wallets::foreach_representative (MDB_txn * transaction_a, std::function<void(rai::public_key const & pub_a, rai::raw_key const & prv_a)> const & action_a)
@ -1389,6 +1373,13 @@ bool rai::wallets::exists (MDB_txn * transaction_a, rai::public_key const & acco
return result;
}
void rai::wallets::stop ()
{
std::lock_guard <std::mutex> lock (mutex);
stopped = true;
condition.notify_all ();
}
rai::uint128_t const rai::wallets::generate_priority = std::numeric_limits<rai::uint128_t>::max ();
rai::uint128_t const rai::wallets::high_priority = std::numeric_limits<rai::uint128_t>::max () - 1;

View file

@ -156,23 +156,27 @@ class wallets
{
public:
wallets (bool &, rai::node &);
~wallets ();
std::shared_ptr<rai::wallet> open (rai::uint256_union const &);
std::shared_ptr<rai::wallet> create (rai::uint256_union const &);
bool search_pending (rai::uint256_union const &);
void search_pending_all ();
void destroy (rai::uint256_union const &);
void do_wallet_actions (rai::account const &);
void queue_wallet_action (rai::account const &, rai::uint128_t const &, std::function<void()> const &);
void do_wallet_actions ();
void queue_wallet_action (rai::uint128_t const &, std::function<void()> const &);
void foreach_representative (MDB_txn *, std::function<void(rai::public_key const &, rai::raw_key const &)> const &);
bool exists (MDB_txn *, rai::public_key const &);
std::function<void(rai::account const &, bool)> observer;
void stop ();
std::function<void(bool)> observer;
std::unordered_map<rai::uint256_union, std::shared_ptr<rai::wallet>> items;
std::unordered_map<rai::account, std::multimap<rai::uint128_t, std::function<void()>, std::greater<rai::uint128_t>>> pending_actions;
std::unordered_set<rai::account> current_actions;
std::mutex action_mutex;
std::multimap<rai::uint128_t, std::function<void()>, std::greater<rai::uint128_t>> actions;
std::mutex mutex;
std::condition_variable condition;
rai::kdf kdf;
MDB_dbi handle;
rai::node & node;
std::thread thread;
bool stopped;
static rai::uint128_t const generate_priority;
static rai::uint128_t const high_priority;
};

View file

@ -1067,22 +1067,19 @@ void rai_qt::wallet::start ()
}));
}
});
node.observers.wallet.add ([this_w](rai::account const & account_a, bool active_a) {
node.observers.wallet.add ([this_w](bool active_a) {
if (auto this_l = this_w.lock ())
{
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w, account_a, active_a]() {
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w, active_a]() {
if (auto this_l = this_w.lock ())
{
if (this_l->account == account_a)
if (active_a)
{
if (active_a)
{
this_l->active_status.insert (rai_qt::status_types::active);
}
else
{
this_l->active_status.erase (rai_qt::status_types::active);
}
this_l->active_status.insert (rai_qt::status_types::active);
}
else
{
this_l->active_status.erase (rai_qt::status_types::active);
}
}
}));