Distributed work refactor (#2255)

* Distributed work refactor

With a factory that also saves distributed work objects and a work cancel observer, work can now be properly cancelled to work peers

* Cleanup the map itself, not only the vectors; use public class members

* Ability to force stop local generation

* Adding distributed_work tests

* Test for multiple requests

* Change to cleanup after adding a new work

* Extract prepared request method

* Merge if

* Finish up test

* Dont retry if stopped (cancelled)

* (unrelated) increasing deadline in a test

* Review items

* Another deadline fix

* Work can complete before the test
This commit is contained in:
Guilherme Lawless 2019-08-29 18:39:33 +01:00 committed by GitHub
commit 060e3b2f45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 605 additions and 359 deletions

View file

@ -6,6 +6,7 @@ add_executable (core_test
block_store.cpp
conflicts.cpp
difficulty.cpp
distributed_work.cpp
entry.cpp
gap_cache.cpp
ipc.cpp

View file

@ -0,0 +1,119 @@
#include <nano/core_test/testutil.hpp>
#include <nano/node/testing.hpp>
#include <gtest/gtest.h>
using namespace std::chrono_literals;
TEST (distributed_work, no_peers)
{
nano::system system (24000, 1);
auto node (system.nodes[0]);
nano::block_hash hash;
boost::optional<uint64_t> work;
auto callback = [&work](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
work = work_a;
};
node->distributed_work.make (hash, callback, node->network_params.network.publish_threshold);
system.deadline_set (5s);
while (!work.is_initialized ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_FALSE (nano::work_validate (hash, *work));
// should only be removed after cleanup
ASSERT_EQ (1, node->distributed_work.work.size ());
node->distributed_work.cleanup_finished ();
ASSERT_EQ (0, node->distributed_work.work.size ());
}
TEST (distributed_work, no_peers_cancel)
{
nano::system system (24000, 1);
auto node (system.nodes[0]);
nano::block_hash hash;
bool done{ false };
auto callback_to_cancel = [&done](boost::optional<uint64_t> work_a) {
ASSERT_FALSE (work_a.is_initialized ());
done = true;
};
node->distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node->network_params.network.publish_threshold));
ASSERT_EQ (1, node->distributed_work.work.size ());
// cleanup should not cancel or remove an ongoing work
node->distributed_work.cleanup_finished ();
ASSERT_EQ (1, node->distributed_work.work.size ());
// manually cancel
node->distributed_work.cancel (hash, true); // forces local stop
system.deadline_set (5s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node->distributed_work.work.empty ());
// now using observer
done = false;
node->distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node->network_params.network.publish_threshold));
ASSERT_EQ (1, node->distributed_work.work.size ());
node->observers.work_cancel.notify (hash);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node->distributed_work.work.empty ());
}
TEST (distributed_work, no_peers_multi)
{
nano::system system (24000, 1);
auto node (system.nodes[0]);
nano::block_hash hash;
unsigned total{ 10 };
std::atomic<unsigned> count{ 0 };
auto callback = [&count](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
++count;
};
// Test many works for the same root
for (unsigned i{ 0 }; i < total; ++i)
{
node->distributed_work.make (hash, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold));
}
// 1 root, and _total_ requests for that root are expected
ASSERT_EQ (1, node->distributed_work.work.size ());
{
auto requests (node->distributed_work.work.begin ());
ASSERT_EQ (hash, requests->first);
ASSERT_EQ (total, requests->second.size ());
}
system.deadline_set (5s);
while (count < total)
{
ASSERT_NO_ERROR (system.poll ());
}
node->distributed_work.cleanup_finished ();
ASSERT_EQ (0, node->distributed_work.work.size ());
count = 0;
// Test many works for different roots
for (unsigned i{ 0 }; i < total; ++i)
{
nano::block_hash hash_i (i + 1);
node->distributed_work.make (hash_i, callback, node->network_params.network.publish_threshold);
}
// 10 roots expected with 1 work each, but some may have completed so test for some
ASSERT_GT (node->distributed_work.work.size (), 5);
for (auto & requests : node->distributed_work.work)
{
ASSERT_EQ (1, requests.second.size ());
}
system.deadline_set (5s);
while (count < total)
{
ASSERT_NO_ERROR (system.poll ());
}
node->distributed_work.cleanup_finished ();
ASSERT_EQ (0, node->distributed_work.work.size ());
count = 0;
}

View file

@ -1196,7 +1196,7 @@ TEST (wallet, work_watcher_cancel)
node.active.update_active_difficulty (lock);
}
// Wait for work generation to start
system.deadline_set (3s);
system.deadline_set (5s);
while (node.work.pending.empty ())
{
ASSERT_NO_ERROR (system.poll ());

View file

@ -37,6 +37,8 @@ add_library (node
confirmation_height_processor.cpp
daemonconfig.hpp
daemonconfig.cpp
distributed_work.hpp
distributed_work.cpp
election.hpp
election.cpp
gap_cache.hpp

View file

@ -0,0 +1,372 @@
#include <nano/node/distributed_work.hpp>
#include <nano/node/node.hpp>
std::shared_ptr<request_type> nano::work_peer_request::get_prepared_json_request (std::string const & request_string_a) const
{
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string_a;
request->prepare_payload ();
return request;
}
nano::distributed_work::distributed_work (unsigned int backoff_a, nano::node & node_a, nano::block_hash const & root_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a) :
callback (callback_a),
backoff (backoff_a),
node (node_a),
root (root_a),
need_resolve (node.config.work_peers),
difficulty (difficulty_a)
{
assert (!completed);
}
nano::distributed_work::~distributed_work ()
{
stop (true);
}
void nano::distributed_work::start ()
{
if (need_resolve.empty ())
{
start_work ();
}
else
{
auto current (need_resolve.back ());
need_resolve.pop_back ();
auto this_l (shared_from_this ());
boost::system::error_code ec;
auto parsed_address (boost::asio::ip::address_v6::from_string (current.first, ec));
if (!ec)
{
outstanding[parsed_address] = current.second;
start ();
}
else
{
node.network.resolver.async_resolve (boost::asio::ip::udp::resolver::query (current.first, std::to_string (current.second)), [current, this_l](boost::system::error_code const & ec, boost::asio::ip::udp::resolver::iterator i_a) {
if (!ec)
{
for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator{}); i != n; ++i)
{
auto endpoint (i->endpoint ());
this_l->outstanding[endpoint.address ()] = endpoint.port ();
}
}
else
{
this_l->node.logger.try_log (boost::str (boost::format ("Error resolving work peer: %1%:%2%: %3%") % current.first % current.second % ec.message ()));
}
this_l->start ();
});
}
}
}
void nano::distributed_work::start_work ()
{
auto this_l (shared_from_this ());
// Start work generation if peers are not acting correctly, or if there are no peers configured
if ((outstanding.empty () || node.unresponsive_work_peers) && (node.config.work_threads != 0 || node.work.opencl))
{
local_generation_started = true;
node.work.generate (
root, [this_l](boost::optional<uint64_t> const & work_a) {
this_l->set_once (work_a);
this_l->stop (false);
},
difficulty);
}
if (!outstanding.empty ())
{
std::lock_guard<std::mutex> guard (mutex);
for (auto const & i : outstanding)
{
auto host (i.first);
auto service (i.second);
auto connection (std::make_shared<nano::work_peer_request> (this_l->node.io_ctx, host, service));
connections.emplace_back (connection);
connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
boost::property_tree::ptree request;
request.put ("action", "work_generate");
request.put ("hash", this_l->root.to_string ());
request.put ("difficulty", nano::to_string_hex (this_l->difficulty));
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (connection->get_prepared_json_request (request_string));
boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
if (connection->response.result () == boost::beast::http::status::ok)
{
this_l->success (connection->response.body (), connection->address);
}
else
{
this_l->node.logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ()));
this_l->failure (connection->address);
}
}
else if (ec == boost::system::errc::operation_canceled)
{
// The only case where we send a cancel is if we preempt stopped waiting for the response
this_l->cancel (connection);
this_l->failure (connection->address);
}
else
{
this_l->node.logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node.logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node.logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
}
}
void nano::distributed_work::cancel (std::shared_ptr<nano::work_peer_request> connection)
{
auto this_l (shared_from_this ());
auto cancelling_l (std::make_shared<nano::work_peer_request> (node.io_ctx, connection->address, connection->port));
cancelling_l->socket.async_connect (nano::tcp_endpoint (cancelling_l->address, cancelling_l->port), [this_l, cancelling_l](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
boost::property_tree::ptree request;
request.put ("action", "work_cancel");
request.put ("hash", this_l->root.to_string ());
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (cancelling_l->get_prepared_json_request (request_string));
boost::beast::http::async_write (cancelling_l->socket, *request, [this_l, request, cancelling_l](boost::system::error_code const & ec, size_t bytes_transferred) {
if (ec)
{
this_l->node.logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling_l->address % cancelling_l->port % ec.message () % ec.value ()));
}
});
}
});
}
void nano::distributed_work::stop (bool const local_stop_a)
{
if (!stopped.exchange (true))
{
std::lock_guard<std::mutex> lock (mutex);
if (local_stop_a && (node.config.work_threads != 0 || node.work.opencl))
{
node.work.cancel (root);
}
for (auto & connection_w : connections)
{
if (auto connection_l = connection_w.lock ())
{
boost::system::error_code ec;
connection_l->socket.cancel (ec);
if (ec)
{
node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ()));
}
try
{
connection_l->socket.close ();
}
catch (const boost::system::system_error & ec)
{
node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.what () % ec.code ()));
}
}
}
connections.clear ();
outstanding.clear ();
}
}
void nano::distributed_work::success (std::string const & body_a, boost::asio::ip::address const & address)
{
auto last (remove (address));
std::stringstream istream (body_a);
try
{
boost::property_tree::ptree result;
boost::property_tree::read_json (istream, result);
auto work_text (result.get<std::string> ("work"));
uint64_t work;
if (!nano::from_string_hex (work_text, work))
{
uint64_t result_difficulty (0);
if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty)
{
node.unresponsive_work_peers = false;
set_once (work);
stop (true);
}
else
{
node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1% for root %2% with diffuculty %3%: %4%") % address % root.to_string () % nano::to_string_hex (difficulty) % work_text));
handle_failure (last);
}
}
else
{
node.logger.try_log (boost::str (boost::format ("Work response from %1% wasn't a number: %2%") % address % work_text));
handle_failure (last);
}
}
catch (...)
{
node.logger.try_log (boost::str (boost::format ("Work response from %1% wasn't parsable: %2%") % address % body_a));
handle_failure (last);
}
}
void nano::distributed_work::set_once (boost::optional<uint64_t> work_a)
{
if (!completed.exchange (true))
{
callback (work_a);
}
}
void nano::distributed_work::failure (boost::asio::ip::address const & address)
{
auto last (remove (address));
handle_failure (last);
}
void nano::distributed_work::handle_failure (bool const last)
{
if (last && !completed)
{
node.unresponsive_work_peers = true;
if (!local_generation_started)
{
if (stopped)
{
callback (boost::none);
}
else
{
if (backoff == 1 && node.config.logging.work_generation_time ())
{
node.logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying...");
}
auto now (std::chrono::steady_clock::now ());
auto root_l (root);
auto callback_l (callback);
std::weak_ptr<nano::node> node_w (node.shared ());
auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5));
// clang-format off
node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] {
if (auto node_l = node_w.lock ())
{
node_l->distributed_work.make (next_backoff, root_l, callback_l, difficulty);
}
});
// clang-format on
}
}
}
}
bool nano::distributed_work::remove (boost::asio::ip::address const & address)
{
std::lock_guard<std::mutex> guard (mutex);
outstanding.erase (address);
return outstanding.empty ();
}
nano::distributed_work_factory::distributed_work_factory (nano::node & node_a) :
node (node_a)
{
}
void nano::distributed_work_factory::make (nano::block_hash const & root_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a)
{
make (1, root_a, callback_a, difficulty_a);
}
void nano::distributed_work_factory::make (unsigned int backoff_a, nano::block_hash const & root_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a)
{
cleanup_finished ();
auto distributed (std::make_shared<nano::distributed_work> (backoff_a, node, root_a, callback_a, difficulty_a));
{
std::lock_guard<std::mutex> guard (mutex);
work[root_a].emplace_back (distributed);
}
distributed->start ();
}
void nano::distributed_work_factory::cancel (nano::block_hash const & root_a, bool const local_stop)
{
{
std::lock_guard<std::mutex> guard (mutex);
auto existing_l (work.find (root_a));
if (existing_l != work.end ())
{
for (auto & distributed_w : existing_l->second)
{
if (auto distributed_l = distributed_w.lock ())
{
// Send work_cancel to work peers
// Cancels local generation if local_stop is true, but usually should be done by the work pool
distributed_l->stop (local_stop);
}
}
work.erase (existing_l);
}
}
}
void nano::distributed_work_factory::cleanup_finished ()
{
std::lock_guard<std::mutex> guard (mutex);
for (auto it (work.begin ()), end (work.end ()); it != end;)
{
it->second.erase (std::remove_if (it->second.begin (), it->second.end (), [](auto distributed_a) {
return distributed_a.expired ();
}),
it->second.end ());
if (it->second.empty ())
{
it = work.erase (it);
}
else
{
++it;
}
}
}

View file

@ -0,0 +1,79 @@
#pragma once
#include <nano/boost/asio.hpp>
#include <nano/boost/beast.hpp>
#include <nano/lib/numbers.hpp>
#include <boost/optional.hpp>
#include <unordered_map>
using request_type = boost::beast::http::request<boost::beast::http::string_body>;
namespace nano
{
class node;
class work_peer_request final
{
public:
work_peer_request (boost::asio::io_context & io_ctx_a, boost::asio::ip::address address_a, uint16_t port_a) :
address (address_a),
port (port_a),
socket (io_ctx_a)
{
}
std::shared_ptr<request_type> get_prepared_json_request (std::string const &) const;
boost::asio::ip::address address;
uint16_t port;
boost::beast::flat_buffer buffer;
boost::beast::http::response<boost::beast::http::string_body> response;
boost::asio::ip::tcp::socket socket;
};
/**
* distributed_work cancels local and peer work requests when going out of scope
*/
class distributed_work final : public std::enable_shared_from_this<nano::distributed_work>
{
public:
distributed_work (unsigned int, nano::node &, nano::block_hash const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t);
~distributed_work ();
void start ();
void start_work ();
void cancel (std::shared_ptr<nano::work_peer_request>);
void stop (bool const);
void success (std::string const &, boost::asio::ip::address const &);
void set_once (boost::optional<uint64_t>);
void failure (boost::asio::ip::address const &);
void handle_failure (bool const);
bool remove (boost::asio::ip::address const &);
std::function<void(boost::optional<uint64_t>)> callback;
unsigned int backoff; // in seconds
nano::node & node;
nano::block_hash root;
std::mutex mutex;
std::map<boost::asio::ip::address, uint16_t> outstanding;
std::vector<std::weak_ptr<nano::work_peer_request>> connections;
std::vector<std::pair<std::string, uint16_t>> need_resolve;
uint64_t difficulty;
std::atomic<bool> completed{ false };
std::atomic<bool> local_generation_started{ false };
std::atomic<bool> stopped{ false };
};
class distributed_work_factory final
{
public:
distributed_work_factory (nano::node &);
void make (nano::block_hash const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t);
void make (unsigned int, nano::block_hash const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t);
void cancel (nano::block_hash const &, bool const local_stop = false);
void cleanup_finished ();
std::unordered_map<nano::block_hash, std::vector<std::weak_ptr<nano::distributed_work>>> work;
std::mutex mutex;
nano::node & node;
};
}

View file

@ -4490,7 +4490,7 @@ void nano::json_handler::work_cancel ()
auto hash (hash_impl ());
if (!ec)
{
node.work.cancel (hash);
node.observers.work_cancel.notify (hash);
}
response_errors ();
}

View file

@ -121,6 +121,7 @@ stats (config.stat_config),
flags (flags_a),
alarm (alarm_a),
work (work_a),
distributed_work (*this),
logger (config_a.logging.min_time_between_log_output),
store_impl (nano::make_store (logger, application_path_a, flags.read_only, true, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_max_dbs, !flags.disable_unchecked_drop, flags.sideband_batch_size, config_a.backup_before_upgrade)),
store (*store_impl),
@ -346,7 +347,7 @@ startup_time (std::chrono::steady_clock::now ())
}
}
});
if (this->websocket_server)
if (websocket_server)
{
observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::vote))
@ -357,6 +358,11 @@ startup_time (std::chrono::steady_clock::now ())
}
});
}
// Cancelling local work generation
observers.work_cancel.add ([this](nano::block_hash const & root_a) {
this->work.cancel (root_a);
this->distributed_work.cancel (root_a);
});
logger.always_log ("Node starting, version: ", NANO_VERSION_STRING);
logger.always_log ("Build information: ", BUILD_INFO);
@ -943,345 +949,6 @@ int nano::node::price (nano::uint128_t const & balance_a, int amount_a)
return static_cast<int> (result * 100.0);
}
namespace
{
class work_request
{
public:
work_request (boost::asio::io_context & io_ctx_a, boost::asio::ip::address address_a, uint16_t port_a) :
address (address_a),
port (port_a),
socket (io_ctx_a)
{
}
boost::asio::ip::address address;
uint16_t port;
boost::beast::flat_buffer buffer;
boost::beast::http::response<boost::beast::http::string_body> response;
boost::asio::ip::tcp::socket socket;
};
class distributed_work : public std::enable_shared_from_this<distributed_work>
{
public:
distributed_work (std::shared_ptr<nano::node> const & node_a, nano::block_hash const & root_a, std::function<void(uint64_t)> const & callback_a, uint64_t difficulty_a) :
distributed_work (1, node_a, root_a, callback_a, difficulty_a)
{
assert (node_a != nullptr);
}
~distributed_work ()
{
stop (true);
}
distributed_work (unsigned int backoff_a, std::shared_ptr<nano::node> const & node_a, nano::block_hash const & root_a, std::function<void(uint64_t)> const & callback_a, uint64_t difficulty_a) :
callback (callback_a),
backoff (backoff_a),
node (node_a),
root (root_a),
need_resolve (node_a->config.work_peers),
difficulty (difficulty_a)
{
assert (node_a != nullptr);
assert (!completed);
}
void start ()
{
if (need_resolve.empty ())
{
start_work ();
}
else
{
auto current (need_resolve.back ());
need_resolve.pop_back ();
auto this_l (shared_from_this ());
boost::system::error_code ec;
auto parsed_address (boost::asio::ip::address_v6::from_string (current.first, ec));
if (!ec)
{
outstanding[parsed_address] = current.second;
start ();
}
else
{
node->network.resolver.async_resolve (boost::asio::ip::udp::resolver::query (current.first, std::to_string (current.second)), [current, this_l](boost::system::error_code const & ec, boost::asio::ip::udp::resolver::iterator i_a) {
if (!ec)
{
for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator{}); i != n; ++i)
{
auto endpoint (i->endpoint ());
this_l->outstanding[endpoint.address ()] = endpoint.port ();
}
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Error resolving work peer: %1%:%2%: %3%") % current.first % current.second % ec.message ()));
}
this_l->start ();
});
}
}
}
void start_work ()
{
auto this_l (shared_from_this ());
// Start work generation if peers are not acting correctly, or if there are no peers configured
if ((outstanding.empty () || node->unresponsive_work_peers) && (node->config.work_threads != 0 || node->work.opencl))
{
local_generation_started = true;
node->work.generate (
this_l->root, [this_l](boost::optional<uint64_t> const & work_a) {
if (work_a)
{
this_l->set_once (work_a.value ());
this_l->stop (false);
}
},
difficulty);
}
if (!outstanding.empty ())
{
std::lock_guard<std::mutex> guard (mutex);
for (auto const & i : outstanding)
{
auto host (i.first);
auto service (i.second);
auto connection (std::make_shared<work_request> (this_l->node->io_ctx, host, service));
connections.push_back (connection);
connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
boost::property_tree::ptree request;
request.put ("action", "work_generate");
request.put ("hash", this_l->root.to_string ());
request.put ("difficulty", nano::to_string_hex (this_l->difficulty));
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string;
request->prepare_payload ();
boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) {
if (!ec)
{
if (connection->response.result () == boost::beast::http::status::ok)
{
this_l->success (connection->response.body (), connection->address);
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ()));
this_l->failure (connection->address);
}
}
else if (ec == boost::system::errc::operation_canceled)
{
// The only case where we send a cancel is if we preempt stopped waiting for the response
this_l->cancel (connection);
this_l->failure (connection->address);
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
else
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ()));
this_l->failure (connection->address);
}
});
}
}
}
void cancel (std::shared_ptr<work_request> connection)
{
auto this_l (shared_from_this ());
auto cancelling (std::make_shared<work_request> (node->io_ctx, connection->address, connection->port));
cancelling->socket.async_connect (nano::tcp_endpoint (cancelling->address, cancelling->port), [this_l, cancelling](boost::system::error_code const & ec) {
if (!ec)
{
std::string request_string;
{
boost::property_tree::ptree request;
request.put ("action", "work_cancel");
request.put ("hash", this_l->root.to_string ());
std::stringstream ostream;
boost::property_tree::write_json (ostream, request);
request_string = ostream.str ();
}
auto request (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
request->method (boost::beast::http::verb::post);
request->set (boost::beast::http::field::content_type, "application/json");
request->target ("/");
request->version (11);
request->body () = request_string;
request->prepare_payload ();
boost::beast::http::async_write (cancelling->socket, *request, [this_l, request, cancelling](boost::system::error_code const & ec, size_t bytes_transferred) {
if (ec)
{
this_l->node->logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling->address % cancelling->port % ec.message () % ec.value ()));
}
});
}
});
}
void stop (bool const local_stop)
{
if (!stopped.exchange (true))
{
std::lock_guard<std::mutex> lock (mutex);
if (local_stop && (node->config.work_threads != 0 || node->work.opencl))
{
node->work.cancel (root);
}
for (auto & i : connections)
{
auto connection = i.lock ();
if (connection)
{
boost::system::error_code ec;
connection->socket.cancel (ec);
if (ec)
{
node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ()));
}
try
{
connection->socket.close ();
}
catch (const boost::system::system_error & ec)
{
node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ()));
}
}
}
connections.clear ();
outstanding.clear ();
}
}
void success (std::string const & body_a, boost::asio::ip::address const & address)
{
auto last (remove (address));
std::stringstream istream (body_a);
try
{
boost::property_tree::ptree result;
boost::property_tree::read_json (istream, result);
auto work_text (result.get<std::string> ("work"));
uint64_t work;
if (!nano::from_string_hex (work_text, work))
{
uint64_t result_difficulty (0);
if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty)
{
node->unresponsive_work_peers = false;
set_once (work);
stop (true);
}
else
{
node->logger.try_log (boost::str (boost::format ("Incorrect work response from %1% for root %2% with diffuculty %3%: %4%") % address % root.to_string () % nano::to_string_hex (difficulty) % work_text));
handle_failure (last);
}
}
else
{
node->logger.try_log (boost::str (boost::format ("Work response from %1% wasn't a number: %2%") % address % work_text));
handle_failure (last);
}
}
catch (...)
{
node->logger.try_log (boost::str (boost::format ("Work response from %1% wasn't parsable: %2%") % address % body_a));
handle_failure (last);
}
}
void set_once (uint64_t work_a)
{
if (!completed.exchange (true))
{
callback (work_a);
}
}
void failure (boost::asio::ip::address const & address)
{
auto last (remove (address));
handle_failure (last);
}
void handle_failure (bool last)
{
if (last)
{
if (!completed)
{
node->unresponsive_work_peers = true;
if (!local_generation_started)
{
if (backoff == 1 && node->config.logging.work_generation_time ())
{
node->logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying...");
}
auto now (std::chrono::steady_clock::now ());
auto root_l (root);
auto callback_l (callback);
std::weak_ptr<nano::node> node_w (node);
auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5));
// clang-format off
node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] {
if (auto node_l = node_w.lock ())
{
auto work_generation (std::make_shared<distributed_work> (next_backoff, node_l, root_l, callback_l, difficulty));
work_generation->start ();
}
});
// clang-format on
}
}
}
}
bool remove (boost::asio::ip::address const & address)
{
std::lock_guard<std::mutex> lock (mutex);
outstanding.erase (address);
return outstanding.empty ();
}
std::function<void(uint64_t)> callback;
unsigned int backoff; // in seconds
std::shared_ptr<nano::node> node;
nano::block_hash root;
std::mutex mutex;
std::map<boost::asio::ip::address, uint16_t> outstanding;
std::vector<std::weak_ptr<work_request>> connections;
std::vector<std::pair<std::string, uint16_t>> need_resolve;
uint64_t difficulty;
std::atomic<bool> completed{ false };
std::atomic<bool> local_generation_started{ false };
std::atomic<bool> stopped{ false };
};
}
void nano::node::work_generate_blocking (nano::block & block_a)
{
work_generate_blocking (block_a, network_params.network.publish_threshold);
@ -1292,15 +959,14 @@ void nano::node::work_generate_blocking (nano::block & block_a, uint64_t difficu
block_a.block_work_set (work_generate_blocking (block_a.root (), difficulty_a));
}
void nano::node::work_generate (nano::uint256_union const & hash_a, std::function<void(uint64_t)> callback_a)
void nano::node::work_generate (nano::uint256_union const & hash_a, std::function<void(boost::optional<uint64_t>)> callback_a)
{
work_generate (hash_a, callback_a, network_params.network.publish_threshold);
}
void nano::node::work_generate (nano::uint256_union const & hash_a, std::function<void(uint64_t)> callback_a, uint64_t difficulty_a)
void nano::node::work_generate (nano::uint256_union const & hash_a, std::function<void(boost::optional<uint64_t>)> callback_a, uint64_t difficulty_a)
{
auto work_generation (std::make_shared<distributed_work> (shared (), hash_a, callback_a, difficulty_a));
work_generation->start ();
distributed_work.make (hash_a, callback_a, difficulty_a);
}
uint64_t nano::node::work_generate_blocking (nano::uint256_union const & block_a)
@ -1313,8 +979,8 @@ uint64_t nano::node::work_generate_blocking (nano::uint256_union const & hash_a,
std::promise<uint64_t> promise;
std::future<uint64_t> future = promise.get_future ();
// clang-format off
work_generate (hash_a, [&promise](uint64_t work_a) {
promise.set_value (work_a);
work_generate (hash_a, [&promise](boost::optional<uint64_t> work_a) {
promise.set_value (work_a.value_or (0));
},
difficulty_a);
// clang-format on

View file

@ -9,6 +9,7 @@
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap.hpp>
#include <nano/node/confirmation_height_processor.hpp>
#include <nano/node/distributed_work.hpp>
#include <nano/node/election.hpp>
#include <nano/node/gap_cache.hpp>
#include <nano/node/logging.hpp>
@ -122,8 +123,8 @@ public:
void work_generate_blocking (nano::block &);
uint64_t work_generate_blocking (nano::uint256_union const &, uint64_t);
uint64_t work_generate_blocking (nano::uint256_union const &);
void work_generate (nano::uint256_union const &, std::function<void(uint64_t)>, uint64_t);
void work_generate (nano::uint256_union const &, std::function<void(uint64_t)>);
void work_generate (nano::uint256_union const &, std::function<void(boost::optional<uint64_t>)>, uint64_t);
void work_generate (nano::uint256_union const &, std::function<void(boost::optional<uint64_t>)>);
void add_initial_peers ();
void block_confirm (std::shared_ptr<nano::block>);
bool block_confirmed_or_being_confirmed (nano::transaction const &, nano::block_hash const &);
@ -146,6 +147,7 @@ public:
nano::node_flags flags;
nano::alarm & alarm;
nano::work_pool & work;
nano::distributed_work_factory distributed_work;
nano::logger_mt logger;
std::unique_ptr<nano::block_store> store_impl;
nano::block_store & store;

View file

@ -10,5 +10,6 @@ std::unique_ptr<nano::seq_con_info_component> nano::collect_seq_con_info (nano::
composite->add_component (collect_seq_con_info (node_observers.account_balance, "account_balance"));
composite->add_component (collect_seq_con_info (node_observers.endpoint, "endpoint"));
composite->add_component (collect_seq_con_info (node_observers.disconnect, "disconnect"));
composite->add_component (collect_seq_con_info (node_observers.work_cancel, "work_cancel"));
return composite;
}

View file

@ -21,6 +21,7 @@ public:
nano::observer_set<std::shared_ptr<nano::transport::channel>> endpoint;
nano::observer_set<> disconnect;
nano::observer_set<uint64_t> difficulty;
nano::observer_set<nano::block_hash const &> work_cancel;
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (node_observers & node_observers, const std::string & name);

View file

@ -1,5 +1,7 @@
#pragma once
#include <nano/secure/blockstore.hpp>
#include <rocksdb/db.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/options.h>
@ -7,8 +9,6 @@
#include <rocksdb/utilities/optimistic_transaction_db.h>
#include <rocksdb/utilities/transaction.h>
#include <nano/secure/blockstore.hpp>
namespace nano
{
class read_rocksdb_txn final : public read_transaction_impl

View file

@ -1508,7 +1508,7 @@ void nano::work_watcher::remove (std::shared_ptr<nano::block> block_a)
if (existing != watched.end () && existing->second->hash () == block_a->hash ())
{
watched.erase (existing);
node.work.cancel (block_a->root ());
node.observers.work_cancel.notify (block_a->root ());
}
}

View file

@ -2842,8 +2842,9 @@ TEST (rpc, work_peer_bad)
node2.config.work_peers.push_back (std::make_pair (boost::asio::ip::address_v6::any ().to_string (), 0));
nano::block_hash hash1 (1);
std::atomic<uint64_t> work (0);
node2.work_generate (hash1, [&work](uint64_t work_a) {
work = work_a;
node2.work_generate (hash1, [&work](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
work = *work_a;
});
system.deadline_set (5s);
while (nano::work_validate (hash1, work))
@ -2871,8 +2872,9 @@ TEST (rpc, work_peer_one)
node2.config.work_peers.push_back (std::make_pair (node1.network.endpoint ().address ().to_string (), rpc.config.port));
nano::keypair key1;
uint64_t work (0);
node2.work_generate (key1.pub, [&work](uint64_t work_a) {
work = work_a;
node2.work_generate (key1.pub, [&work](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
work = *work_a;
});
system.deadline_set (5s);
while (nano::work_validate (key1.pub, work))
@ -2923,8 +2925,9 @@ TEST (rpc, work_peer_many)
{
nano::keypair key1;
uint64_t work (0);
node1.work_generate (key1.pub, [&work](uint64_t work_a) {
work = work_a;
node1.work_generate (key1.pub, [&work](boost::optional<uint64_t> work_a) {
ASSERT_TRUE (work_a.is_initialized ());
work = *work_a;
});
while (nano::work_validate (key1.pub, work))
{