diff --git a/nano/node/distributed_work.cpp b/nano/node/distributed_work.cpp index 8e1fe2d5..0eb55d6f 100644 --- a/nano/node/distributed_work.cpp +++ b/nano/node/distributed_work.cpp @@ -1,12 +1,14 @@ +#include +#include #include #include #include #include -std::shared_ptr nano::work_peer_request::get_prepared_json_request (std::string const & request_string_a) const +std::shared_ptr nano::distributed_work::peer_request::get_prepared_json_request (std::string const & request_string_a) const { - auto request (std::make_shared> ()); + auto request (std::make_shared ()); request->method (boost::beast::http::verb::post); request->set (boost::beast::http::field::content_type, "application/json"); auto address_string = boost::algorithm::erase_first_copy (address.to_string (), "::ffff:"); @@ -18,15 +20,12 @@ std::shared_ptr nano::work_peer_request::get_prepared_json_request return request; } -nano::distributed_work::distributed_work (nano::node & node_a, nano::root const & root_a, std::vector> const & peers_a, unsigned int backoff_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) : -callback (callback_a), -backoff (backoff_a), +nano::distributed_work::distributed_work (nano::node & node_a, nano::work_request const & request_a, std::chrono::seconds const & backoff_a) : node (node_a), -root (root_a), -account (account_a), -peers (peers_a), -need_resolve (peers_a), -difficulty (difficulty_a), +request (request_a), +backoff (backoff_a), +strand (node_a.io_ctx.get_executor ()), +need_resolve (request_a.peers), elapsed (nano::timer_state::started, "distributed work generation timer") { assert (!finished); @@ -41,15 +40,15 @@ nano::distributed_work::~distributed_work () nano::websocket::message_builder builder; if (status == work_generation_status::success) { - node.websocket_server->broadcast (builder.work_generation (root, work_result, difficulty, node.network_params.network.publish_threshold, elapsed.value (), winner, bad_peers)); + node.websocket_server->broadcast (builder.work_generation (request.root, work_result, request.difficulty, node.network_params.network.publish_threshold, elapsed.value (), winner, bad_peers)); } else if (status == work_generation_status::cancelled) { - node.websocket_server->broadcast (builder.work_cancelled (root, difficulty, node.network_params.network.publish_threshold, elapsed.value (), bad_peers)); + node.websocket_server->broadcast (builder.work_cancelled (request.root, request.difficulty, node.network_params.network.publish_threshold, elapsed.value (), bad_peers)); } else if (status == work_generation_status::failure_local || status == work_generation_status::failure_peers) { - node.websocket_server->broadcast (builder.work_failed (root, difficulty, node.network_params.network.publish_threshold, elapsed.value (), bad_peers)); + node.websocket_server->broadcast (builder.work_failed (request.root, request.difficulty, node.network_params.network.publish_threshold, elapsed.value (), bad_peers)); } } stop_once (true); @@ -98,29 +97,6 @@ void nano::distributed_work::start_work () { auto this_l (shared_from_this ()); - // Start work generation if peers are not acting correctly, or if there are no peers configured - if ((outstanding.empty () || node.unresponsive_work_peers) && node.local_work_generation_enabled ()) - { - local_generation_started = true; - node.work.generate ( - root, [this_l](boost::optional const & work_a) { - if (work_a.is_initialized ()) - { - this_l->set_once (*work_a); - } - else if (!this_l->finished.exchange (true)) - { - this_l->status = work_generation_status::failure_local; - if (this_l->callback) - { - this_l->callback (boost::none); - } - } - this_l->stop_once (false); - }, - difficulty); - } - if (!outstanding.empty ()) { nano::lock_guard guard (mutex); @@ -128,37 +104,42 @@ void nano::distributed_work::start_work () { auto host (i.first); auto service (i.second); - auto connection (std::make_shared (this_l->node.io_ctx, host, service)); + auto connection (std::make_shared (this_l->node.io_ctx, host, service)); connections.emplace_back (connection); - connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) { - if (!ec) + connection->socket.async_connect (nano::tcp_endpoint (host, service), + boost::asio::bind_executor (strand, + [this_l, connection](boost::system::error_code const & ec) { + if (!ec && !this_l->stopped) { std::string request_string; { boost::property_tree::ptree request; request.put ("action", "work_generate"); - request.put ("hash", this_l->root.to_string ()); - request.put ("difficulty", nano::to_string_hex (this_l->difficulty)); - if (this_l->account.is_initialized ()) + request.put ("hash", this_l->request.root.to_string ()); + request.put ("difficulty", nano::to_string_hex (this_l->request.difficulty)); + if (this_l->request.account.is_initialized ()) { - request.put ("account", this_l->account.get ().to_account ()); + request.put ("account", this_l->request.account.get ().to_account ()); } std::stringstream ostream; boost::property_tree::write_json (ostream, request); request_string = ostream.str (); } - auto request (connection->get_prepared_json_request (request_string)); - boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) { - if (!ec) + auto peer_request (connection->get_prepared_json_request (request_string)); + boost::beast::http::async_write (connection->socket, *peer_request, + boost::asio::bind_executor (this_l->strand, + [this_l, connection, peer_request](boost::system::error_code const & ec, size_t size_a) { + if (!ec && !this_l->stopped) { - boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) { - if (!ec) + boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, + boost::asio::bind_executor (this_l->strand, [this_l, connection](boost::system::error_code const & ec, size_t size_a) { + if (!ec && !this_l->stopped) { if (connection->response.result () == boost::beast::http::status::ok) { this_l->success (connection->response.body (), connection->address, connection->port); } - else + else if (ec) { this_l->node.logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ())); this_l->add_bad_peer (connection->address, connection->port); @@ -168,66 +149,92 @@ void nano::distributed_work::start_work () else if (ec == boost::system::errc::operation_canceled) { // The only case where we send a cancel is if we preempt stopped waiting for the response - this_l->cancel_connection (connection); + this_l->cancel (*connection); this_l->failure (connection->address); } - else + else if (ec) { this_l->node.logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); this_l->add_bad_peer (connection->address, connection->port); this_l->failure (connection->address); } - }); + })); } - else + else if (ec && ec != boost::system::errc::operation_canceled) { this_l->node.logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); this_l->add_bad_peer (connection->address, connection->port); this_l->failure (connection->address); } - }); + })); } - else + else if (ec && ec != boost::system::errc::operation_canceled) { this_l->node.logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); this_l->add_bad_peer (connection->address, connection->port); this_l->failure (connection->address); } - }); + })); } } - if (!local_generation_started && outstanding.empty () && callback) + // Start work generation if peers are not acting correctly, or if there are no peers configured + if ((outstanding.empty () || node.unresponsive_work_peers) && node.local_work_generation_enabled ()) { - callback (boost::none); + local_generation_started = true; + node.work.generate ( + request.root, [this_l](boost::optional const & work_a) { + if (work_a.is_initialized ()) + { + this_l->set_once (*work_a); + } + else if (!this_l->finished.exchange (true)) + { + this_l->status = work_generation_status::failure_local; + if (this_l->request.callback) + { + this_l->request.callback (boost::none); + } + } + this_l->stop_once (false); + }, + request.difficulty); + } + else if (outstanding.empty () && request.callback) + { + request.callback (boost::none); } } -void nano::distributed_work::cancel_connection (std::shared_ptr connection_a) +void nano::distributed_work::cancel (peer_request const & connection_a) { auto this_l (shared_from_this ()); - auto cancelling_l (std::make_shared (node.io_ctx, connection_a->address, connection_a->port)); - cancelling_l->socket.async_connect (nano::tcp_endpoint (cancelling_l->address, cancelling_l->port), [this_l, cancelling_l](boost::system::error_code const & ec) { + auto cancelling_l (std::make_shared (node.io_ctx, connection_a.address, connection_a.port)); + cancelling_l->socket.async_connect (nano::tcp_endpoint (cancelling_l->address, cancelling_l->port), + boost::asio::bind_executor (strand, + [this_l, cancelling_l](boost::system::error_code const & ec) { if (!ec) { std::string request_string; { boost::property_tree::ptree request; request.put ("action", "work_cancel"); - request.put ("hash", this_l->root.to_string ()); + request.put ("hash", this_l->request.root.to_string ()); std::stringstream ostream; boost::property_tree::write_json (ostream, request); request_string = ostream.str (); } - auto request (cancelling_l->get_prepared_json_request (request_string)); - boost::beast::http::async_write (cancelling_l->socket, *request, [this_l, request, cancelling_l](boost::system::error_code const & ec, size_t bytes_transferred) { - if (ec) + auto peer_cancel (cancelling_l->get_prepared_json_request (request_string)); + boost::beast::http::async_write (cancelling_l->socket, *peer_cancel, + boost::asio::bind_executor (this_l->strand, + [this_l, peer_cancel, cancelling_l](boost::system::error_code const & ec, size_t bytes_transferred) { + if (ec && ec != boost::system::errc::operation_canceled) { this_l->node.logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling_l->address % cancelling_l->port % ec.message () % ec.value ())); } - }); + })); } - }); + })); } void nano::distributed_work::success (std::string const & body_a, boost::asio::ip::address const & address_a, uint16_t port_a) @@ -243,7 +250,7 @@ void nano::distributed_work::success (std::string const & body_a, boost::asio::i if (!nano::from_string_hex (work_text, work)) { uint64_t result_difficulty (0); - if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty) + if (!nano::work_validate (request.root, work, &result_difficulty) && result_difficulty >= request.difficulty) { node.unresponsive_work_peers = false; set_once (work, boost::str (boost::format ("%1%:%2%") % address_a % port_a)); @@ -251,7 +258,7 @@ void nano::distributed_work::success (std::string const & body_a, boost::asio::i } else { - node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1%:%2% for root %3% with diffuculty %4%: %5%") % address_a % port_a % root.to_string () % nano::to_string_hex (difficulty) % work_text)); + node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1%:%2% for root %3% with diffuculty %4%: %5%") % address_a % port_a % request.root.to_string () % nano::to_string_hex (request.difficulty) % work_text)); add_bad_peer (address_a, port_a); handle_failure (last); } @@ -278,68 +285,74 @@ void nano::distributed_work::stop_once (bool const local_stop_a) nano::lock_guard guard (mutex); if (local_stop_a && node.local_work_generation_enabled ()) { - node.work.cancel (root); + node.work.cancel (request.root); } for (auto & connection_w : connections) { if (auto connection_l = connection_w.lock ()) { - boost::system::error_code ec; - connection_l->socket.cancel (ec); - if (ec) - { - node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ())); - } - try - { - connection_l->socket.close (); - } - catch (const boost::system::system_error & ec) - { - node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.what () % ec.code ())); - } + auto this_l (shared_from_this ()); + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, connection_l] { + boost::system::error_code ec; + if (connection_l->socket.is_open ()) + { + connection_l->socket.cancel (ec); + if (!ec) + { + connection_l->socket.close (ec); + if (ec) + { + this_l->node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ())); + } + } + else + { + this_l->node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ())); + } + } + })); } } - connections.clear (); - outstanding.clear (); } + connections.clear (); + outstanding.clear (); } -void nano::distributed_work::set_once (uint64_t work_a, std::string const & source_a) +void nano::distributed_work::set_once (uint64_t const work_a, std::string const & source_a) { if (!finished.exchange (true)) { elapsed.stop (); status = work_generation_status::success; - if (callback) + if (request.callback) { - callback (work_a); + request.callback (work_a); } winner = source_a; work_result = work_a; if (node.config.logging.work_generation_time ()) { boost::format unformatted_l ("Work generation for %1%, with a threshold difficulty of %2% (multiplier %3%x) complete: %4% ms"); - auto multiplier_text_l (nano::to_string (nano::difficulty::to_multiplier (difficulty, node.network_params.network.publish_threshold), 2)); - node.logger.try_log (boost::str (unformatted_l % root.to_string () % nano::to_string_hex (difficulty) % multiplier_text_l % elapsed.value ().count ())); + auto multiplier_text_l (nano::to_string (nano::difficulty::to_multiplier (request.difficulty, node.network_params.network.publish_threshold), 2)); + node.logger.try_log (boost::str (unformatted_l % request.root.to_string () % nano::to_string_hex (request.difficulty) % multiplier_text_l % elapsed.value ().count ())); } } } -void nano::distributed_work::cancel_once () +void nano::distributed_work::cancel () { if (!finished.exchange (true)) { elapsed.stop (); status = work_generation_status::cancelled; - if (callback) + if (request.callback) { - callback (boost::none); + request.callback (boost::none); } stop_once (true); if (node.config.logging.work_generation_time ()) { - node.logger.try_log (boost::str (boost::format ("Work generation for %1% was cancelled after %2% ms") % root.to_string () % elapsed.value ().count ())); + node.logger.try_log (boost::str (boost::format ("Work generation for %1% was cancelled after %2% ms") % request.root.to_string () % elapsed.value ().count ())); } } } @@ -358,23 +371,23 @@ void nano::distributed_work::handle_failure (bool const last_a) if (!local_generation_started && !finished.exchange (true)) { status = work_generation_status::failure_peers; - if (backoff == 1 && node.config.logging.work_generation_time ()) + if (backoff == std::chrono::seconds (1) && node.config.logging.work_generation_time ()) { - node.logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); + node.logger.always_log ("Work peer(s) failed to generate work for root ", request.root.to_string (), ", retrying..."); } auto now (std::chrono::steady_clock::now ()); std::weak_ptr node_w (node.shared ()); - auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); + auto next_backoff (std::min (backoff * 2, std::chrono::seconds (5 * 60))); // clang-format off - node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l = root, peers_l = peers, callback_l = callback, next_backoff, difficulty = difficulty, account_l = account ] { + node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, request_l = request, next_backoff] { bool error_l {true}; if (auto node_l = node_w.lock ()) { - error_l = node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l); + error_l = node_l->distributed_work.make (next_backoff, request_l); } - if (error_l && callback_l) + if (error_l && request_l.callback) { - callback_l (boost::none); + request_l.callback (boost::none); } }); // clang-format on diff --git a/nano/node/distributed_work.hpp b/nano/node/distributed_work.hpp index 623bcb5b..a0151740 100644 --- a/nano/node/distributed_work.hpp +++ b/nano/node/distributed_work.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -8,8 +9,8 @@ #include +#include #include -#include using request_type = boost::beast::http::request; @@ -25,21 +26,13 @@ namespace nano { class node; -class work_peer_request final +struct work_request final { -public: - work_peer_request (boost::asio::io_context & io_ctx_a, boost::asio::ip::address address_a, uint16_t port_a) : - address (address_a), - port (port_a), - socket (io_ctx_a) - { - } - std::shared_ptr get_prepared_json_request (std::string const &) const; - boost::asio::ip::address address; - uint16_t port; - boost::beast::flat_buffer buffer; - boost::beast::http::response response; - boost::asio::ip::tcp::socket socket; + nano::root root; + uint64_t difficulty; + boost::optional const account; + std::function)> callback; + std::vector> const peers; }; /** @@ -56,39 +49,62 @@ class distributed_work final : public std::enable_shared_from_this get_prepared_json_request (std::string const &) const; + boost::asio::ip::address address; + uint16_t port; + boost::beast::flat_buffer buffer; + boost::beast::http::response response; + boost::asio::ip::tcp::socket socket; + }; + public: - distributed_work (nano::node &, nano::root const &, std::vector> const & peers_a, unsigned int, std::function)> const &, uint64_t, boost::optional const & = boost::none); + distributed_work (nano::node &, nano::work_request const &, std::chrono::seconds const &); ~distributed_work (); void start (); + void cancel (); + +private: void start_work (); - void cancel_connection (std::shared_ptr); + /** Cancellation is done with an entirely new connection, \p request_a is only used to copy its address and port */ + void cancel (peer_request const & request_a); + /** Called on a successful peer response, validates the reply */ void success (std::string const &, boost::asio::ip::address const &, uint16_t const); + /** Send a work_cancel message to all remaining connections */ void stop_once (bool const); - void set_once (uint64_t, std::string const & source_a = "local"); - void cancel_once (); + void set_once (uint64_t const, std::string const & source_a = "local"); void failure (boost::asio::ip::address const &); void handle_failure (bool const); bool remove (boost::asio::ip::address const &); void add_bad_peer (boost::asio::ip::address const &, uint16_t const); - std::function)> callback; - unsigned int backoff; // in seconds nano::node & node; - nano::root root; - boost::optional const account; - std::mutex mutex; - std::map outstanding; - std::vector> connections; - std::vector> const peers; + nano::work_request request; + + std::chrono::seconds backoff; + boost::asio::strand strand; std::vector> need_resolve; - uint64_t difficulty; - uint64_t work_result{ 0 }; - std::atomic finished{ false }; - std::atomic stopped{ false }; - std::atomic local_generation_started{ false }; + std::map outstanding; + std::vector> connections; + work_generation_status status{ work_generation_status::ongoing }; + uint64_t work_result{ 0 }; + nano::timer elapsed; // logging only std::vector bad_peers; // websocket std::string winner; // websocket + + std::mutex mutex; + std::atomic finished{ false }; + std::atomic stopped{ false }; + std::atomic local_generation_started{ false }; }; } \ No newline at end of file diff --git a/nano/node/distributed_work_factory.cpp b/nano/node/distributed_work_factory.cpp index 806f1937..4dd5d43f 100644 --- a/nano/node/distributed_work_factory.cpp +++ b/nano/node/distributed_work_factory.cpp @@ -1,4 +1,3 @@ -#include #include #include @@ -14,10 +13,10 @@ nano::distributed_work_factory::~distributed_work_factory () bool nano::distributed_work_factory::make (nano::root const & root_a, std::vector> const & peers_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) { - return make (1, root_a, peers_a, callback_a, difficulty_a, account_a); + return make (std::chrono::seconds (1), nano::work_request{ root_a, difficulty_a, account_a, callback_a, peers_a }); } -bool nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector> const & peers_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) +bool nano::distributed_work_factory::make (std::chrono::seconds const & backoff_a, nano::work_request const & request_a) { bool error_l{ true }; if (!stopped) @@ -25,10 +24,10 @@ bool nano::distributed_work_factory::make (unsigned int backoff_a, nano::root co cleanup_finished (); if (node.work_generation_enabled ()) { - auto distributed (std::make_shared (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a)); + auto distributed (std::make_shared (node, request_a, backoff_a)); { nano::lock_guard guard (mutex); - items[root_a].emplace_back (distributed); + items[request_a.root].emplace_back (distributed); } distributed->start (); error_l = false; @@ -48,7 +47,7 @@ void nano::distributed_work_factory::cancel (nano::root const & root_a, bool con if (auto distributed_l = distributed_w.lock ()) { // Send work_cancel to work peers and stop local work generation - distributed_l->cancel_once (); + distributed_l->cancel (); } } items.erase (existing_l); diff --git a/nano/node/distributed_work_factory.hpp b/nano/node/distributed_work_factory.hpp index 29782f30..7bf7e035 100644 --- a/nano/node/distributed_work_factory.hpp +++ b/nano/node/distributed_work_factory.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include @@ -22,7 +23,7 @@ public: distributed_work_factory (nano::node &); ~distributed_work_factory (); bool make (nano::root const &, std::vector> const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); - bool make (unsigned int, nano::root const &, std::vector> const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); + bool make (std::chrono::seconds const &, nano::work_request const &); void cancel (nano::root const &, bool const local_stop = false); void cleanup_finished (); void stop (); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 5ade717e..7a052ee6 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -3038,7 +3038,7 @@ TEST (rpc, work_peer_many) for (auto i (0); i < 10; ++i) { nano::keypair key1; - uint64_t work (0); + std::atomic work (0); node1.work_generate (key1.pub, [&work](boost::optional work_a) { ASSERT_TRUE (work_a.is_initialized ()); work = *work_a; @@ -3051,6 +3051,7 @@ TEST (rpc, work_peer_many) system4.poll (); } } + node1.stop (); } TEST (rpc, block_count)