Stop work generation before stopping threads waiting for work (#2375)

* Stop distributed_work as soon as possible, and always use the callback version of work_generate in RPCs

* Unecessary if

* Non-functional declaration shift
This commit is contained in:
Guilherme Lawless 2019-10-31 18:17:45 +00:00 committed by GitHub
commit 417fff9f86
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 309 additions and 324 deletions

View file

@ -1231,350 +1231,333 @@ void nano::json_handler::block_count_type ()
void nano::json_handler::block_create ()
{
std::string type (request.get<std::string> ("type"));
nano::wallet_id wallet (0);
boost::optional<std::string> wallet_text (request.get_optional<std::string> ("wallet"));
if (wallet_text.is_initialized ())
{
if (wallet.decode_hex (wallet_text.get ()))
{
ec = nano::error_common::bad_wallet_number;
}
}
nano::account account (0);
boost::optional<std::string> account_text (request.get_optional<std::string> ("account"));
if (!ec && account_text.is_initialized ())
{
account = account_impl (account_text.get ());
}
nano::account representative (0);
boost::optional<std::string> representative_text (request.get_optional<std::string> ("representative"));
if (!ec && representative_text.is_initialized ())
{
representative = account_impl (representative_text.get (), nano::error_rpc::bad_representative_number);
}
nano::account destination (0);
boost::optional<std::string> destination_text (request.get_optional<std::string> ("destination"));
if (!ec && destination_text.is_initialized ())
{
destination = account_impl (destination_text.get (), nano::error_rpc::bad_destination);
}
nano::block_hash source (0);
boost::optional<std::string> source_text (request.get_optional<std::string> ("source"));
if (!ec && source_text.is_initialized ())
{
if (source.decode_hex (source_text.get ()))
{
ec = nano::error_rpc::bad_source;
}
}
nano::amount amount (0);
boost::optional<std::string> amount_text (request.get_optional<std::string> ("amount"));
if (!ec && amount_text.is_initialized ())
{
if (amount.decode_dec (amount_text.get ()))
{
ec = nano::error_common::invalid_amount;
}
}
auto work (work_optional_impl ());
nano::raw_key prv;
prv.data.clear ();
nano::block_hash previous (0);
nano::amount balance (0);
if (work == 0 && !node.work_generation_enabled ())
{
ec = nano::error_common::disabled_work_generation;
}
if (!ec && wallet != 0 && account != 0)
{
auto existing (node.wallets.items.find (wallet));
if (existing != node.wallets.items.end ())
{
auto transaction (node.wallets.tx_begin_read ());
auto block_transaction (node.store.tx_begin_read ());
wallet_locked_impl (transaction, existing->second);
wallet_account_impl (transaction, existing->second, account);
if (!ec)
{
existing->second->store.fetch (transaction, account, prv);
previous = node.ledger.latest (block_transaction, account);
balance = node.ledger.account_balance (block_transaction, account);
}
}
else
{
ec = nano::error_common::wallet_not_found;
}
}
boost::optional<std::string> key_text (request.get_optional<std::string> ("key"));
if (!ec && key_text.is_initialized ())
{
if (prv.data.decode_hex (key_text.get ()))
{
ec = nano::error_common::bad_private_key;
}
}
boost::optional<std::string> previous_text (request.get_optional<std::string> ("previous"));
if (!ec && previous_text.is_initialized ())
{
if (previous.decode_hex (previous_text.get ()))
{
ec = nano::error_rpc::bad_previous;
}
}
boost::optional<std::string> balance_text (request.get_optional<std::string> ("balance"));
if (!ec && balance_text.is_initialized ())
{
if (balance.decode_dec (balance_text.get ()))
{
ec = nano::error_rpc::invalid_balance;
}
}
nano::link link (0);
boost::optional<std::string> link_text (request.get_optional<std::string> ("link"));
if (!ec && link_text.is_initialized ())
{
if (link.decode_account (link_text.get ()))
{
if (link.decode_hex (link_text.get ()))
{
ec = nano::error_rpc::bad_link;
}
}
}
else
{
// Retrieve link from source or destination
if (source.is_zero ())
{
link = destination;
}
else
{
link = source;
}
}
if (!ec)
{
std::string type (request.get<std::string> ("type"));
nano::wallet_id wallet (0);
boost::optional<std::string> wallet_text (request.get_optional<std::string> ("wallet"));
if (wallet_text.is_initialized ())
{
if (wallet.decode_hex (wallet_text.get ()))
auto rpc_l (shared_from_this ());
// Serializes the block contents to the RPC response
auto block_response_put_l = [rpc_l, this](nano::block const & block_a) {
boost::property_tree::ptree response_l;
response_l.put ("hash", block_a.hash ().to_string ());
bool json_block_l = request.get<bool> ("json_block", false);
if (json_block_l)
{
ec = nano::error_common::bad_wallet_number;
boost::property_tree::ptree block_node_l;
block_a.serialize_json (block_node_l);
response_l.add_child ("block", block_node_l);
}
}
nano::account account (0);
boost::optional<std::string> account_text (request.get_optional<std::string> ("account"));
if (!ec && account_text.is_initialized ())
{
account = account_impl (account_text.get ());
}
nano::account representative (0);
boost::optional<std::string> representative_text (request.get_optional<std::string> ("representative"));
if (!ec && representative_text.is_initialized ())
{
representative = account_impl (representative_text.get (), nano::error_rpc::bad_representative_number);
}
nano::account destination (0);
boost::optional<std::string> destination_text (request.get_optional<std::string> ("destination"));
if (!ec && destination_text.is_initialized ())
{
destination = account_impl (destination_text.get (), nano::error_rpc::bad_destination);
}
nano::block_hash source (0);
boost::optional<std::string> source_text (request.get_optional<std::string> ("source"));
if (!ec && source_text.is_initialized ())
{
if (source.decode_hex (source_text.get ()))
else
{
ec = nano::error_rpc::bad_source;
std::string contents;
block_a.serialize_json (contents);
response_l.put ("block", contents);
}
}
nano::amount amount (0);
boost::optional<std::string> amount_text (request.get_optional<std::string> ("amount"));
if (!ec && amount_text.is_initialized ())
{
if (amount.decode_dec (amount_text.get ()))
{
ec = nano::error_common::invalid_amount;
}
}
auto work (work_optional_impl ());
nano::raw_key prv;
prv.data.clear ();
nano::block_hash previous (0);
nano::amount balance (0);
if (work == 0 && !node.work_generation_enabled ())
{
ec = nano::error_common::disabled_work_generation;
}
if (!ec && wallet != 0 && account != 0)
{
auto existing (node.wallets.items.find (wallet));
if (existing != node.wallets.items.end ())
{
auto transaction (node.wallets.tx_begin_read ());
auto block_transaction (node.store.tx_begin_read ());
wallet_locked_impl (transaction, existing->second);
wallet_account_impl (transaction, existing->second, account);
if (!ec)
std::stringstream ostream;
boost::property_tree::write_json (ostream, response_l);
rpc_l->response (ostream.str ());
};
// Wrapper from argument to lambda capture, to extend the block's scope
auto get_callback_l = [rpc_l, this, block_response_put_l](std::shared_ptr<nano::block> block_a) {
// Callback upon work generation success or failure
return [block_a, rpc_l, this, block_response_put_l](boost::optional<uint64_t> const & work_a) {
if (block_a != nullptr)
{
existing->second->store.fetch (transaction, account, prv);
previous = node.ledger.latest (block_transaction, account);
balance = node.ledger.account_balance (block_transaction, account);
if (work_a.is_initialized ())
{
block_a->block_work_set (*work_a);
block_response_put_l (*block_a);
}
else
{
rpc_l->ec = nano::error_common::failure_work_generation;
}
}
else
{
rpc_l->ec = nano::error_common::generic;
}
rpc_l->response_errors ();
};
};
if (prv.data != 0)
{
nano::account pub (nano::pub_key (prv.as_private_key ()));
// Fetching account balance & previous for send blocks (if aren't given directly)
if (!previous_text.is_initialized () && !balance_text.is_initialized ())
{
auto transaction (node.store.tx_begin_read ());
previous = node.ledger.latest (transaction, pub);
balance = node.ledger.account_balance (transaction, pub);
}
// Double check current balance if previous block is specified
else if (previous_text.is_initialized () && balance_text.is_initialized () && type == "send")
{
auto transaction (node.store.tx_begin_read ());
if (node.store.block_exists (transaction, previous) && node.store.block_balance (transaction, previous) != balance.number ())
{
ec = nano::error_rpc::block_create_balance_mismatch;
}
}
// Check for incorrect account key
if (!ec && account_text.is_initialized ())
{
if (account != pub)
{
ec = nano::error_rpc::block_create_public_key_mismatch;
}
}
nano::block_builder builder_l;
std::shared_ptr<nano::block> block_l{ nullptr };
nano::root root_l;
if (type == "state")
{
if (previous_text.is_initialized () && !representative.is_zero () && (!link.is_zero () || link_text.is_initialized ()))
{
block_l = builder_l.state ()
.account (pub)
.previous (previous)
.representative (representative)
.balance (balance)
.link (link)
.sign (prv, pub)
.build ();
if (previous.is_zero ())
{
root_l = pub;
}
else
{
root_l = previous;
}
}
else
{
ec = nano::error_rpc::block_create_requirements_state;
}
}
else if (type == "open")
{
if (representative != 0 && source != 0)
{
block_l = builder_l.open ()
.account (pub)
.source (source)
.representative (representative)
.sign (prv, pub)
.build ();
root_l = pub;
}
else
{
ec = nano::error_rpc::block_create_requirements_open;
}
}
else if (type == "receive")
{
if (source != 0 && previous != 0)
{
block_l = builder_l.receive ()
.previous (previous)
.source (source)
.sign (prv, pub)
.build ();
root_l = previous;
}
else
{
ec = nano::error_rpc::block_create_requirements_receive;
}
}
else if (type == "change")
{
if (representative != 0 && previous != 0)
{
block_l = builder_l.change ()
.previous (previous)
.representative (representative)
.sign (prv, pub)
.build ();
root_l = previous;
}
else
{
ec = nano::error_rpc::block_create_requirements_change;
}
}
else if (type == "send")
{
if (destination != 0 && previous != 0 && balance != 0 && amount != 0)
{
if (balance.number () >= amount.number ())
{
block_l = builder_l.send ()
.previous (previous)
.destination (destination)
.balance (balance.number () - amount.number ())
.sign (prv, pub)
.build ();
root_l = previous;
}
else
{
ec = nano::error_common::insufficient_balance;
}
}
else
{
ec = nano::error_rpc::block_create_requirements_send;
}
}
else
{
ec = nano::error_common::wallet_not_found;
ec = nano::error_blocks::invalid_type;
}
}
boost::optional<std::string> key_text (request.get_optional<std::string> ("key"));
if (!ec && key_text.is_initialized ())
{
if (prv.data.decode_hex (key_text.get ()))
if (!ec)
{
ec = nano::error_common::bad_private_key;
}
}
boost::optional<std::string> previous_text (request.get_optional<std::string> ("previous"));
if (!ec && previous_text.is_initialized ())
{
if (previous.decode_hex (previous_text.get ()))
{
ec = nano::error_rpc::bad_previous;
}
}
boost::optional<std::string> balance_text (request.get_optional<std::string> ("balance"));
if (!ec && balance_text.is_initialized ())
{
if (balance.decode_dec (balance_text.get ()))
{
ec = nano::error_rpc::invalid_balance;
}
}
nano::link link (0);
boost::optional<std::string> link_text (request.get_optional<std::string> ("link"));
if (!ec && link_text.is_initialized ())
{
if (link.decode_account (link_text.get ()))
{
if (link.decode_hex (link_text.get ()))
if (work == 0)
{
ec = nano::error_rpc::bad_link;
node.work_generate (root_l, get_callback_l (block_l), nano::account (pub));
}
else
{
block_l->block_work_set (work);
block_response_put_l (*block_l);
}
}
}
else
{
// Retrieve link from source or destination
if (source.is_zero ())
{
link = destination;
}
else
{
link = source;
}
}
if (!ec)
{
if (prv.data != 0)
{
nano::account pub (nano::pub_key (prv.as_private_key ()));
// Fetching account balance & previous for send blocks (if aren't given directly)
if (!previous_text.is_initialized () && !balance_text.is_initialized ())
{
auto transaction (node.store.tx_begin_read ());
previous = node.ledger.latest (transaction, pub);
balance = node.ledger.account_balance (transaction, pub);
}
// Double check current balance if previous block is specified
else if (previous_text.is_initialized () && balance_text.is_initialized () && type == "send")
{
auto transaction (node.store.tx_begin_read ());
if (node.store.block_exists (transaction, previous) && node.store.block_balance (transaction, previous) != balance.number ())
{
ec = nano::error_rpc::block_create_balance_mismatch;
}
}
// Check for incorrect account key
if (!ec && account_text.is_initialized ())
{
if (account != pub)
{
ec = nano::error_rpc::block_create_public_key_mismatch;
}
}
if (type == "state")
{
if (previous_text.is_initialized () && !representative.is_zero () && (!link.is_zero () || link_text.is_initialized ()))
{
if (work == 0)
{
nano::root root;
if (previous.is_zero ())
{
root = pub;
}
else
{
root = previous;
}
auto opt_work_l (node.work_generate_blocking (root, nano::account (pub)));
if (opt_work_l.is_initialized ())
{
work = *opt_work_l;
}
else
{
ec = nano::error_common::failure_work_generation;
}
}
if (!ec)
{
nano::state_block state (pub, previous, representative, balance, link, prv, pub, work);
response_l.put ("hash", state.hash ().to_string ());
bool json_block_l = request.get<bool> ("json_block", false);
if (json_block_l)
{
boost::property_tree::ptree block_node_l;
state.serialize_json (block_node_l);
response_l.add_child ("block", block_node_l);
}
else
{
std::string contents;
state.serialize_json (contents);
response_l.put ("block", contents);
}
}
}
else
{
ec = nano::error_rpc::block_create_requirements_state;
}
}
else if (type == "open")
{
if (representative != 0 && source != 0)
{
if (work == 0)
{
auto opt_work_l (node.work_generate_blocking (pub, nano::account (pub)));
if (opt_work_l.is_initialized ())
{
work = *opt_work_l;
}
else
{
ec = nano::error_common::failure_work_generation;
}
}
if (!ec)
{
nano::open_block open (source, representative, pub, prv, pub, work);
response_l.put ("hash", open.hash ().to_string ());
std::string contents;
open.serialize_json (contents);
response_l.put ("block", contents);
}
}
else
{
ec = nano::error_rpc::block_create_requirements_open;
}
}
else if (type == "receive")
{
if (source != 0 && previous != 0)
{
if (work == 0)
{
auto opt_work_l (node.work_generate_blocking (previous, nano::account (pub)));
if (opt_work_l.is_initialized ())
{
work = *opt_work_l;
}
else
{
ec = nano::error_common::failure_work_generation;
}
}
if (!ec)
{
nano::receive_block receive (previous, source, prv, pub, work);
response_l.put ("hash", receive.hash ().to_string ());
std::string contents;
receive.serialize_json (contents);
response_l.put ("block", contents);
}
}
else
{
ec = nano::error_rpc::block_create_requirements_receive;
}
}
else if (type == "change")
{
if (representative != 0 && previous != 0)
{
if (work == 0)
{
auto opt_work_l (node.work_generate_blocking (previous, nano::account (pub)));
if (opt_work_l.is_initialized ())
{
work = *opt_work_l;
}
else
{
ec = nano::error_common::failure_work_generation;
}
}
if (!ec)
{
nano::change_block change (previous, representative, prv, pub, work);
response_l.put ("hash", change.hash ().to_string ());
std::string contents;
change.serialize_json (contents);
response_l.put ("block", contents);
}
}
else
{
ec = nano::error_rpc::block_create_requirements_change;
}
}
else if (type == "send")
{
if (destination != 0 && previous != 0 && balance != 0 && amount != 0)
{
if (balance.number () >= amount.number ())
{
if (work == 0)
{
auto opt_work_l (node.work_generate_blocking (previous, nano::account (pub)));
if (opt_work_l.is_initialized ())
{
work = *opt_work_l;
}
else
{
ec = nano::error_common::failure_work_generation;
}
}
if (!ec)
{
nano::send_block send (previous, destination, balance.number () - amount.number (), prv, pub, work);
response_l.put ("hash", send.hash ().to_string ());
std::string contents;
send.serialize_json (contents);
response_l.put ("block", contents);
}
}
else
{
ec = nano::error_common::insufficient_balance;
}
}
else
{
ec = nano::error_rpc::block_create_requirements_send;
}
}
else
{
ec = nano::error_blocks::invalid_type;
}
}
else
{
ec = nano::error_rpc::block_create_key_required;
}
ec = nano::error_rpc::block_create_key_required;
}
}
response_errors ();
// Because of callback
if (ec)
{
response_errors ();
}
}
void nano::json_handler::block_hash ()

View file

@ -692,6 +692,9 @@ void nano::node::stop ()
{
logger.always_log ("Node stopping");
write_database_queue.stop ();
// Cancels ongoing work generation tasks, which may be blocking other threads
// No tasks may wait for work generation in I/O threads, or termination signal capturing will be unable to call node::stop()
distributed_work.stop ();
block_processor.stop ();
if (block_processor_thread.joinable ())
{
@ -712,7 +715,6 @@ void nano::node::stop ()
wallets.stop ();
stats.stop ();
worker.stop ();
distributed_work.stop ();
// work pool is not stopped on purpose due to testing setup
}
}