Use a strand in distributed_work (#2474)

* Use a strand in distributed_work

Wrap all distributed_work socket handling in a strand. Could use one strand for each peer [I think], but with the very low amount of messages (2 sent and 1 received at max) this seems unnecessary

Other fixes and enhancements:

- Fix a rare issue where generation would finish before `start_work()` is done (by moving the local generation code to the end of the function)
- TSAN race fix in rpc.work_peer_many
- Wrap request parameters in a work_request class
- No logging when ongoing operations are canceled
- Make `backoff` a chrono object
- Move the work_peer_request class into distributed_work and rename it

* Document distributed_work and simplify one cancel_connection() signature

* Wrap socket.cancel() in the strand and fix some logging

* Even socket.is_open()
This commit is contained in:
Guilherme Lawless 2020-01-12 10:40:32 +00:00 committed by GitHub
commit 418b7b0628
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 172 additions and 142 deletions

View file

@ -1,12 +1,14 @@
#include <nano/boost/asio/bind_executor.hpp>
#include <nano/boost/asio/post.hpp>
#include <nano/node/distributed_work.hpp>
#include <nano/node/node.hpp>
#include <nano/node/websocket.hpp>
#include <boost/algorithm/string/erase.hpp>
std::shared_ptr<request_type> nano::work_peer_request::get_prepared_json_request (std::string const & request_string_a) const
std::shared_ptr<request_type> nano::distributed_work::peer_request::get_prepared_json_request (std::string const & request_string_a) const
{
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
auto request (std::make_shared<request_type> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
auto address_string = boost::algorithm::erase_first_copy (address.to_string (), "::ffff:");
@ -18,15 +20,12 @@ std::shared_ptr<request_type> nano::work_peer_request::get_prepared_json_request
return request;
}
nano::distributed_work::distributed_work (nano::node & node_a, nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, unsigned int backoff_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a) :
callback (callback_a),
backoff (backoff_a),
nano::distributed_work::distributed_work (nano::node & node_a, nano::work_request const & request_a, std::chrono::seconds const & backoff_a) :
node (node_a),
root (root_a),
account (account_a),
peers (peers_a),
need_resolve (peers_a),
difficulty (difficulty_a),
request (request_a),
backoff (backoff_a),
strand (node_a.io_ctx.get_executor ()),
need_resolve (request_a.peers),
elapsed (nano::timer_state::started, "distributed work generation timer")
{
assert (!finished);
@ -41,15 +40,15 @@ nano::distributed_work::~distributed_work ()
nano::websocket::message_builder builder;
if (status == work_generation_status::success)
{
node.websocket_server->broadcast (builder.work_generation (root, work_result, difficulty, node.network_params.network.publish_threshold, elapsed.value (), winner, bad_peers));
node.websocket_server->broadcast (builder.work_generation (request.root, work_result, request.difficulty, node.network_params.network.publish_threshold, elapsed.value (), winner, bad_peers));
}
else if (status == work_generation_status::cancelled)
{
node.websocket_server->broadcast (builder.work_cancelled (root, difficulty, node.network_params.network.publish_threshold, elapsed.value (), bad_peers));
node.websocket_server->broadcast (builder.work_cancelled (request.root, request.difficulty, node.network_params.network.publish_threshold, elapsed.value (), bad_peers));
}
else if (status == work_generation_status::failure_local || status == work_generation_status::failure_peers)
{
node.websocket_server->broadcast (builder.work_failed (root, difficulty, node.network_params.network.publish_threshold, elapsed.value (), bad_peers));
node.websocket_server->broadcast (builder.work_failed (request.root, request.difficulty, node.network_params.network.publish_threshold, elapsed.value (), bad_peers));
}
}
stop_once (true);
@ -98,29 +97,6 @@ void nano::distributed_work::start_work ()
{
auto this_l (shared_from_this ());
// Start work generation if peers are not acting correctly, or if there are no peers configured
if ((outstanding.empty () || node.unresponsive_work_peers) && node.local_work_generation_enabled ())
{
local_generation_started = true;
node.work.generate (
root, [this_l](boost::optional<uint64_t> const & work_a) {
if (work_a.is_initialized ())
{
this_l->set_once (*work_a);
}
else if (!this_l->finished.exchange (true))
{
this_l->status = work_generation_status::failure_local;
if (this_l->callback)
{
this_l->callback (boost::none);
}
}
this_l->stop_once (false);
},
difficulty);
}
if (!outstanding.empty ())
{
nano::lock_guard<std::mutex> guard (mutex);
@ -128,37 +104,42 @@ void nano::distributed_work::start_work ()
{
auto host (i.first);
auto service (i.second);
auto connection (std::make_shared<nano::work_peer_request> (this_l->node.io_ctx, host, service));
auto connection (std::make_shared<peer_request> (this_l->node.io_ctx, host, service));
connections.emplace_back (connection);
connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) {
if (!ec)
connection->socket.async_connect (nano::tcp_endpoint (host, service),
boost::asio::bind_executor (strand,
[this_l, connection](boost::system::error_code const & ec) {
if (!ec && !this_l->stopped)
{
std::string request_string;
{
boost::property_tree::ptree request;
request.put ("action", "work_generate");
request.put ("hash", this_l->root.to_string ());
request.put ("difficulty", nano::to_string_hex (this_l->difficulty));
if (this_l->account.is_initialized ())
request.put ("hash", this_l->request.root.to_string ());
request.put ("difficulty", nano::to_string_hex (this_l->request.difficulty));
if (this_l->request.account.is_initialized ())
{
request.put ("account", this_l->account.get ().to_account ());
request.put ("account", this_l->request.account.get ().to_account ());
}
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (connection->get_prepared_json_request (request_string));
boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
auto peer_request (connection->get_prepared_json_request (request_string));
boost::beast::http::async_write (connection->socket, *peer_request,
boost::asio::bind_executor (this_l->strand,
[this_l, connection, peer_request](boost::system::error_code const & ec, size_t size_a) {
if (!ec && !this_l->stopped)
{
boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
boost::beast::http::async_read (connection->socket, connection->buffer, connection->response,
boost::asio::bind_executor (this_l->strand, [this_l, connection](boost::system::error_code const & ec, size_t size_a) {
if (!ec && !this_l->stopped)
{
if (connection->response.result () == boost::beast::http::status::ok)
{
this_l->success (connection->response.body (), connection->address, connection->port);
}
else
else if (ec)
{
this_l->node.logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ()));
this_l->add_bad_peer (connection->address, connection->port);
@ -168,66 +149,92 @@ void nano::distributed_work::start_work ()
else if (ec == boost::system::errc::operation_canceled)
{
// The only case where we send a cancel is if we preempt stopped waiting for the response
this_l->cancel_connection (connection);
this_l->cancel (*connection);
this_l->failure (connection->address);
}
else
else if (ec)
{
this_l->node.logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->add_bad_peer (connection->address, connection->port);
this_l->failure (connection->address);
}
});
}));
}
else
else if (ec && ec != boost::system::errc::operation_canceled)
{
this_l->node.logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->add_bad_peer (connection->address, connection->port);
this_l->failure (connection->address);
}
});
}));
}
else
else if (ec && ec != boost::system::errc::operation_canceled)
{
this_l->node.logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->add_bad_peer (connection->address, connection->port);
this_l->failure (connection->address);
}
});
}));
}
}
if (!local_generation_started && outstanding.empty () && callback)
// Start work generation if peers are not acting correctly, or if there are no peers configured
if ((outstanding.empty () || node.unresponsive_work_peers) && node.local_work_generation_enabled ())
{
callback (boost::none);
local_generation_started = true;
node.work.generate (
request.root, [this_l](boost::optional<uint64_t> const & work_a) {
if (work_a.is_initialized ())
{
this_l->set_once (*work_a);
}
else if (!this_l->finished.exchange (true))
{
this_l->status = work_generation_status::failure_local;
if (this_l->request.callback)
{
this_l->request.callback (boost::none);
}
}
this_l->stop_once (false);
},
request.difficulty);
}
else if (outstanding.empty () && request.callback)
{
request.callback (boost::none);
}
}
void nano::distributed_work::cancel_connection (std::shared_ptr<nano::work_peer_request> connection_a)
void nano::distributed_work::cancel (peer_request const & connection_a)
{
auto this_l (shared_from_this ());
auto cancelling_l (std::make_shared<nano::work_peer_request> (node.io_ctx, connection_a->address, connection_a->port));
cancelling_l->socket.async_connect (nano::tcp_endpoint (cancelling_l->address, cancelling_l->port), [this_l, cancelling_l](boost::system::error_code const & ec) {
auto cancelling_l (std::make_shared<peer_request> (node.io_ctx, connection_a.address, connection_a.port));
cancelling_l->socket.async_connect (nano::tcp_endpoint (cancelling_l->address, cancelling_l->port),
boost::asio::bind_executor (strand,
[this_l, cancelling_l](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
boost::property_tree::ptree request;
request.put ("action", "work_cancel");
request.put ("hash", this_l->root.to_string ());
request.put ("hash", this_l->request.root.to_string ());
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (cancelling_l->get_prepared_json_request (request_string));
boost::beast::http::async_write (cancelling_l->socket, *request, [this_l, request, cancelling_l](boost::system::error_code const & ec, size_t bytes_transferred) {
if (ec)
auto peer_cancel (cancelling_l->get_prepared_json_request (request_string));
boost::beast::http::async_write (cancelling_l->socket, *peer_cancel,
boost::asio::bind_executor (this_l->strand,
[this_l, peer_cancel, cancelling_l](boost::system::error_code const & ec, size_t bytes_transferred) {
if (ec && ec != boost::system::errc::operation_canceled)
{
this_l->node.logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling_l->address % cancelling_l->port % ec.message () % ec.value ()));
}
});
}));
}
});
}));
}
void nano::distributed_work::success (std::string const & body_a, boost::asio::ip::address const & address_a, uint16_t port_a)
@ -243,7 +250,7 @@ void nano::distributed_work::success (std::string const & body_a, boost::asio::i
if (!nano::from_string_hex (work_text, work))
{
uint64_t result_difficulty (0);
if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty)
if (!nano::work_validate (request.root, work, &result_difficulty) && result_difficulty >= request.difficulty)
{
node.unresponsive_work_peers = false;
set_once (work, boost::str (boost::format ("%1%:%2%") % address_a % port_a));
@ -251,7 +258,7 @@ void nano::distributed_work::success (std::string const & body_a, boost::asio::i
}
else
{
node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1%:%2% for root %3% with diffuculty %4%: %5%") % address_a % port_a % root.to_string () % nano::to_string_hex (difficulty) % work_text));
node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1%:%2% for root %3% with diffuculty %4%: %5%") % address_a % port_a % request.root.to_string () % nano::to_string_hex (request.difficulty) % work_text));
add_bad_peer (address_a, port_a);
handle_failure (last);
}
@ -278,68 +285,74 @@ void nano::distributed_work::stop_once (bool const local_stop_a)
nano::lock_guard<std::mutex> guard (mutex);
if (local_stop_a && node.local_work_generation_enabled ())
{
node.work.cancel (root);
node.work.cancel (request.root);
}
for (auto & connection_w : connections)
{
if (auto connection_l = connection_w.lock ())
{
boost::system::error_code ec;
connection_l->socket.cancel (ec);
if (ec)
{
node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ()));
}
try
{
connection_l->socket.close ();
}
catch (const boost::system::system_error & ec)
{
node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.what () % ec.code ()));
}
auto this_l (shared_from_this ());
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, connection_l] {
boost::system::error_code ec;
if (connection_l->socket.is_open ())
{
connection_l->socket.cancel (ec);
if (!ec)
{
connection_l->socket.close (ec);
if (ec)
{
this_l->node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ()));
}
}
else
{
this_l->node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ()));
}
}
}));
}
}
connections.clear ();
outstanding.clear ();
}
connections.clear ();
outstanding.clear ();
}
void nano::distributed_work::set_once (uint64_t work_a, std::string const & source_a)
void nano::distributed_work::set_once (uint64_t const work_a, std::string const & source_a)
{
if (!finished.exchange (true))
{
elapsed.stop ();
status = work_generation_status::success;
if (callback)
if (request.callback)
{
callback (work_a);
request.callback (work_a);
}
winner = source_a;
work_result = work_a;
if (node.config.logging.work_generation_time ())
{
boost::format unformatted_l ("Work generation for %1%, with a threshold difficulty of %2% (multiplier %3%x) complete: %4% ms");
auto multiplier_text_l (nano::to_string (nano::difficulty::to_multiplier (difficulty, node.network_params.network.publish_threshold), 2));
node.logger.try_log (boost::str (unformatted_l % root.to_string () % nano::to_string_hex (difficulty) % multiplier_text_l % elapsed.value ().count ()));
auto multiplier_text_l (nano::to_string (nano::difficulty::to_multiplier (request.difficulty, node.network_params.network.publish_threshold), 2));
node.logger.try_log (boost::str (unformatted_l % request.root.to_string () % nano::to_string_hex (request.difficulty) % multiplier_text_l % elapsed.value ().count ()));
}
}
}
void nano::distributed_work::cancel_once ()
void nano::distributed_work::cancel ()
{
if (!finished.exchange (true))
{
elapsed.stop ();
status = work_generation_status::cancelled;
if (callback)
if (request.callback)
{
callback (boost::none);
request.callback (boost::none);
}
stop_once (true);
if (node.config.logging.work_generation_time ())
{
node.logger.try_log (boost::str (boost::format ("Work generation for %1% was cancelled after %2% ms") % root.to_string () % elapsed.value ().count ()));
node.logger.try_log (boost::str (boost::format ("Work generation for %1% was cancelled after %2% ms") % request.root.to_string () % elapsed.value ().count ()));
}
}
}
@ -358,23 +371,23 @@ void nano::distributed_work::handle_failure (bool const last_a)
if (!local_generation_started && !finished.exchange (true))
{
status = work_generation_status::failure_peers;
if (backoff == 1 && node.config.logging.work_generation_time ())
if (backoff == std::chrono::seconds (1) && node.config.logging.work_generation_time ())
{
node.logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying...");
node.logger.always_log ("Work peer(s) failed to generate work for root ", request.root.to_string (), ", retrying...");
}
auto now (std::chrono::steady_clock::now ());
std::weak_ptr<nano::node> node_w (node.shared ());
auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5));
auto next_backoff (std::min (backoff * 2, std::chrono::seconds (5 * 60)));
// clang-format off
node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l = root, peers_l = peers, callback_l = callback, next_backoff, difficulty = difficulty, account_l = account ] {
node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, request_l = request, next_backoff] {
bool error_l {true};
if (auto node_l = node_w.lock ())
{
error_l = node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l);
error_l = node_l->distributed_work.make (next_backoff, request_l);
}
if (error_l && callback_l)
if (error_l && request_l.callback)
{
callback_l (boost::none);
request_l.callback (boost::none);
}
});
// clang-format on

View file

@ -1,6 +1,7 @@
#pragma once
#include <nano/boost/asio/ip/tcp.hpp>
#include <nano/boost/asio/strand.hpp>
#include <nano/boost/beast/core/flat_buffer.hpp>
#include <nano/boost/beast/http/string_body.hpp>
#include <nano/lib/numbers.hpp>
@ -8,8 +9,8 @@
#include <boost/optional.hpp>
#include <map>
#include <mutex>
#include <unordered_map>
using request_type = boost::beast::http::request<boost::beast::http::string_body>;
@ -25,21 +26,13 @@ namespace nano
{
class node;
class work_peer_request final
struct work_request final
{
public:
work_peer_request (boost::asio::io_context & io_ctx_a, boost::asio::ip::address address_a, uint16_t port_a) :
address (address_a),
port (port_a),
socket (io_ctx_a)
{
}
std::shared_ptr<request_type> get_prepared_json_request (std::string const &) const;
boost::asio::ip::address address;
uint16_t port;
boost::beast::flat_buffer buffer;
boost::beast::http::response<boost::beast::http::string_body> response;
boost::asio::ip::tcp::socket socket;
nano::root root;
uint64_t difficulty;
boost::optional<nano::account> const account;
std::function<void(boost::optional<uint64_t>)> callback;
std::vector<std::pair<std::string, uint16_t>> const peers;
};
/**
@ -56,39 +49,62 @@ class distributed_work final : public std::enable_shared_from_this<nano::distrib
failure_peers
};
class peer_request final
{
public:
peer_request (boost::asio::io_context & io_ctx_a, boost::asio::ip::address address_a, uint16_t port_a) :
address (address_a),
port (port_a),
socket (io_ctx_a)
{
}
std::shared_ptr<request_type> get_prepared_json_request (std::string const &) const;
boost::asio::ip::address address;
uint16_t port;
boost::beast::flat_buffer buffer;
boost::beast::http::response<boost::beast::http::string_body> response;
boost::asio::ip::tcp::socket socket;
};
public:
distributed_work (nano::node &, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const & peers_a, unsigned int, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
distributed_work (nano::node &, nano::work_request const &, std::chrono::seconds const &);
~distributed_work ();
void start ();
void cancel ();
private:
void start_work ();
void cancel_connection (std::shared_ptr<nano::work_peer_request>);
/** Cancellation is done with an entirely new connection, \p request_a is only used to copy its address and port */
void cancel (peer_request const & request_a);
/** Called on a successful peer response, validates the reply */
void success (std::string const &, boost::asio::ip::address const &, uint16_t const);
/** Send a work_cancel message to all remaining connections */
void stop_once (bool const);
void set_once (uint64_t, std::string const & source_a = "local");
void cancel_once ();
void set_once (uint64_t const, std::string const & source_a = "local");
void failure (boost::asio::ip::address const &);
void handle_failure (bool const);
bool remove (boost::asio::ip::address const &);
void add_bad_peer (boost::asio::ip::address const &, uint16_t const);
std::function<void(boost::optional<uint64_t>)> callback;
unsigned int backoff; // in seconds
nano::node & node;
nano::root root;
boost::optional<nano::account> const account;
std::mutex mutex;
std::map<boost::asio::ip::address, uint16_t> outstanding;
std::vector<std::weak_ptr<nano::work_peer_request>> connections;
std::vector<std::pair<std::string, uint16_t>> const peers;
nano::work_request request;
std::chrono::seconds backoff;
boost::asio::strand<boost::asio::io_context::executor_type> strand;
std::vector<std::pair<std::string, uint16_t>> need_resolve;
uint64_t difficulty;
uint64_t work_result{ 0 };
std::atomic<bool> finished{ false };
std::atomic<bool> stopped{ false };
std::atomic<bool> local_generation_started{ false };
std::map<boost::asio::ip::address, uint16_t> outstanding;
std::vector<std::weak_ptr<peer_request>> connections;
work_generation_status status{ work_generation_status::ongoing };
uint64_t work_result{ 0 };
nano::timer<std::chrono::milliseconds> elapsed; // logging only
std::vector<std::string> bad_peers; // websocket
std::string winner; // websocket
std::mutex mutex;
std::atomic<bool> finished{ false };
std::atomic<bool> stopped{ false };
std::atomic<bool> local_generation_started{ false };
};
}

View file

@ -1,4 +1,3 @@
#include <nano/node/distributed_work.hpp>
#include <nano/node/distributed_work_factory.hpp>
#include <nano/node/node.hpp>
@ -14,10 +13,10 @@ nano::distributed_work_factory::~distributed_work_factory ()
bool nano::distributed_work_factory::make (nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
{
return make (1, root_a, peers_a, callback_a, difficulty_a, account_a);
return make (std::chrono::seconds (1), nano::work_request{ root_a, difficulty_a, account_a, callback_a, peers_a });
}
bool nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
bool nano::distributed_work_factory::make (std::chrono::seconds const & backoff_a, nano::work_request const & request_a)
{
bool error_l{ true };
if (!stopped)
@ -25,10 +24,10 @@ bool nano::distributed_work_factory::make (unsigned int backoff_a, nano::root co
cleanup_finished ();
if (node.work_generation_enabled ())
{
auto distributed (std::make_shared<nano::distributed_work> (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a));
auto distributed (std::make_shared<nano::distributed_work> (node, request_a, backoff_a));
{
nano::lock_guard<std::mutex> guard (mutex);
items[root_a].emplace_back (distributed);
items[request_a.root].emplace_back (distributed);
}
distributed->start ();
error_l = false;
@ -48,7 +47,7 @@ void nano::distributed_work_factory::cancel (nano::root const & root_a, bool con
if (auto distributed_l = distributed_w.lock ())
{
// Send work_cancel to work peers and stop local work generation
distributed_l->cancel_once ();
distributed_l->cancel ();
}
}
items.erase (existing_l);

View file

@ -1,6 +1,7 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <nano/node/distributed_work.hpp>
#include <boost/optional/optional.hpp>
@ -22,7 +23,7 @@ public:
distributed_work_factory (nano::node &);
~distributed_work_factory ();
bool make (nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
bool make (unsigned int, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
bool make (std::chrono::seconds const &, nano::work_request const &);
void cancel (nano::root const &, bool const local_stop = false);
void cleanup_finished ();
void stop ();

View file

@ -3038,7 +3038,7 @@ TEST (rpc, work_peer_many)
for (auto i (0); i < 10; ++i)
{
nano::keypair key1;
uint64_t work (0);
std::atomic<uint64_t> work (0);
node1.work_generate (key1.pub, [&work](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
work = *work_a;
@ -3051,6 +3051,7 @@ TEST (rpc, work_peer_many)
system4.poll ();
}
}
node1.stop ();
}
TEST (rpc, block_count)