From 060e3b2f45da42601bfa7450032734174e532f99 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Thu, 29 Aug 2019 18:39:33 +0100 Subject: [PATCH] Distributed work refactor (#2255) * Distributed work refactor With a factory that also saves distributed work objects and a work cancel observer, work can now be properly cancelled to work peers * Cleanup the map itself, not only the vectors; use public class members * Ability to force stop local generation * Adding distributed_work tests * Test for multiple requests * Change to cleanup after adding a new work * Extract prepared request method * Merge if * Finish up test * Dont retry if stopped (cancelled) * (unrelated) increasing deadline in a test * Review items * Another deadline fix * Work can complete before the test --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/distributed_work.cpp | 119 +++++++++ nano/core_test/wallet.cpp | 2 +- nano/node/CMakeLists.txt | 2 + nano/node/distributed_work.cpp | 372 ++++++++++++++++++++++++++++ nano/node/distributed_work.hpp | 79 ++++++ nano/node/json_handler.cpp | 2 +- nano/node/node.cpp | 358 +------------------------- nano/node/node.hpp | 6 +- nano/node/node_observers.cpp | 1 + nano/node/node_observers.hpp | 1 + nano/node/rocksdb/rocksdb_txn.hpp | 4 +- nano/node/wallet.cpp | 2 +- nano/rpc_test/rpc.cpp | 15 +- 14 files changed, 605 insertions(+), 359 deletions(-) create mode 100644 nano/core_test/distributed_work.cpp create mode 100644 nano/node/distributed_work.cpp create mode 100644 nano/node/distributed_work.hpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 41a0da0d..b7b19914 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -6,6 +6,7 @@ add_executable (core_test block_store.cpp conflicts.cpp difficulty.cpp + distributed_work.cpp entry.cpp gap_cache.cpp ipc.cpp diff --git a/nano/core_test/distributed_work.cpp b/nano/core_test/distributed_work.cpp new file mode 100644 index 00000000..e3104558 --- /dev/null +++ b/nano/core_test/distributed_work.cpp @@ -0,0 +1,119 @@ +#include +#include + +#include + +using namespace std::chrono_literals; + +TEST (distributed_work, no_peers) +{ + nano::system system (24000, 1); + auto node (system.nodes[0]); + nano::block_hash hash; + boost::optional work; + auto callback = [&work](boost::optional work_a) { + ASSERT_TRUE (work_a.is_initialized ()); + work = work_a; + }; + node->distributed_work.make (hash, callback, node->network_params.network.publish_threshold); + system.deadline_set (5s); + while (!work.is_initialized ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_FALSE (nano::work_validate (hash, *work)); + // should only be removed after cleanup + ASSERT_EQ (1, node->distributed_work.work.size ()); + node->distributed_work.cleanup_finished (); + ASSERT_EQ (0, node->distributed_work.work.size ()); +} + +TEST (distributed_work, no_peers_cancel) +{ + nano::system system (24000, 1); + auto node (system.nodes[0]); + nano::block_hash hash; + bool done{ false }; + auto callback_to_cancel = [&done](boost::optional work_a) { + ASSERT_FALSE (work_a.is_initialized ()); + done = true; + }; + node->distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node->network_params.network.publish_threshold)); + ASSERT_EQ (1, node->distributed_work.work.size ()); + // cleanup should not cancel or remove an ongoing work + node->distributed_work.cleanup_finished (); + ASSERT_EQ (1, node->distributed_work.work.size ()); + + // manually cancel + node->distributed_work.cancel (hash, true); // forces local stop + system.deadline_set (5s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_TRUE (node->distributed_work.work.empty ()); + + // now using observer + done = false; + node->distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node->network_params.network.publish_threshold)); + ASSERT_EQ (1, node->distributed_work.work.size ()); + node->observers.work_cancel.notify (hash); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_TRUE (node->distributed_work.work.empty ()); +} + +TEST (distributed_work, no_peers_multi) +{ + nano::system system (24000, 1); + auto node (system.nodes[0]); + nano::block_hash hash; + unsigned total{ 10 }; + std::atomic count{ 0 }; + auto callback = [&count](boost::optional work_a) { + ASSERT_TRUE (work_a.is_initialized ()); + ++count; + }; + // Test many works for the same root + for (unsigned i{ 0 }; i < total; ++i) + { + node->distributed_work.make (hash, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold)); + } + // 1 root, and _total_ requests for that root are expected + ASSERT_EQ (1, node->distributed_work.work.size ()); + { + auto requests (node->distributed_work.work.begin ()); + ASSERT_EQ (hash, requests->first); + ASSERT_EQ (total, requests->second.size ()); + } + system.deadline_set (5s); + while (count < total) + { + ASSERT_NO_ERROR (system.poll ()); + } + node->distributed_work.cleanup_finished (); + ASSERT_EQ (0, node->distributed_work.work.size ()); + count = 0; + // Test many works for different roots + for (unsigned i{ 0 }; i < total; ++i) + { + nano::block_hash hash_i (i + 1); + node->distributed_work.make (hash_i, callback, node->network_params.network.publish_threshold); + } + // 10 roots expected with 1 work each, but some may have completed so test for some + ASSERT_GT (node->distributed_work.work.size (), 5); + for (auto & requests : node->distributed_work.work) + { + ASSERT_EQ (1, requests.second.size ()); + } + system.deadline_set (5s); + while (count < total) + { + ASSERT_NO_ERROR (system.poll ()); + } + node->distributed_work.cleanup_finished (); + ASSERT_EQ (0, node->distributed_work.work.size ()); + count = 0; +} diff --git a/nano/core_test/wallet.cpp b/nano/core_test/wallet.cpp index b128e43e..81cd7d4c 100644 --- a/nano/core_test/wallet.cpp +++ b/nano/core_test/wallet.cpp @@ -1196,7 +1196,7 @@ TEST (wallet, work_watcher_cancel) node.active.update_active_difficulty (lock); } // Wait for work generation to start - system.deadline_set (3s); + system.deadline_set (5s); while (node.work.pending.empty ()) { ASSERT_NO_ERROR (system.poll ()); diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 3d57d86e..8f4ae19a 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -37,6 +37,8 @@ add_library (node confirmation_height_processor.cpp daemonconfig.hpp daemonconfig.cpp + distributed_work.hpp + distributed_work.cpp election.hpp election.cpp gap_cache.hpp diff --git a/nano/node/distributed_work.cpp b/nano/node/distributed_work.cpp new file mode 100644 index 00000000..b0d95cb1 --- /dev/null +++ b/nano/node/distributed_work.cpp @@ -0,0 +1,372 @@ +#include +#include + +std::shared_ptr nano::work_peer_request::get_prepared_json_request (std::string const & request_string_a) const +{ + auto request (std::make_shared> ()); + request->method (boost::beast::http::verb::post); + request->set (boost::beast::http::field::content_type, "application/json"); + request->target ("/"); + request->version (11); + request->body () = request_string_a; + request->prepare_payload (); + return request; +} + +nano::distributed_work::distributed_work (unsigned int backoff_a, nano::node & node_a, nano::block_hash const & root_a, std::function)> const & callback_a, uint64_t difficulty_a) : +callback (callback_a), +backoff (backoff_a), +node (node_a), +root (root_a), +need_resolve (node.config.work_peers), +difficulty (difficulty_a) +{ + assert (!completed); +} + +nano::distributed_work::~distributed_work () +{ + stop (true); +} + +void nano::distributed_work::start () +{ + if (need_resolve.empty ()) + { + start_work (); + } + else + { + auto current (need_resolve.back ()); + need_resolve.pop_back (); + auto this_l (shared_from_this ()); + boost::system::error_code ec; + auto parsed_address (boost::asio::ip::address_v6::from_string (current.first, ec)); + if (!ec) + { + outstanding[parsed_address] = current.second; + start (); + } + else + { + node.network.resolver.async_resolve (boost::asio::ip::udp::resolver::query (current.first, std::to_string (current.second)), [current, this_l](boost::system::error_code const & ec, boost::asio::ip::udp::resolver::iterator i_a) { + if (!ec) + { + for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator{}); i != n; ++i) + { + auto endpoint (i->endpoint ()); + this_l->outstanding[endpoint.address ()] = endpoint.port (); + } + } + else + { + this_l->node.logger.try_log (boost::str (boost::format ("Error resolving work peer: %1%:%2%: %3%") % current.first % current.second % ec.message ())); + } + this_l->start (); + }); + } + } +} + +void nano::distributed_work::start_work () +{ + auto this_l (shared_from_this ()); + + // Start work generation if peers are not acting correctly, or if there are no peers configured + if ((outstanding.empty () || node.unresponsive_work_peers) && (node.config.work_threads != 0 || node.work.opencl)) + { + local_generation_started = true; + node.work.generate ( + root, [this_l](boost::optional const & work_a) { + this_l->set_once (work_a); + this_l->stop (false); + }, + difficulty); + } + + if (!outstanding.empty ()) + { + std::lock_guard guard (mutex); + for (auto const & i : outstanding) + { + auto host (i.first); + auto service (i.second); + auto connection (std::make_shared (this_l->node.io_ctx, host, service)); + connections.emplace_back (connection); + connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) { + if (!ec) + { + std::string request_string; + { + boost::property_tree::ptree request; + request.put ("action", "work_generate"); + request.put ("hash", this_l->root.to_string ()); + request.put ("difficulty", nano::to_string_hex (this_l->difficulty)); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + request_string = ostream.str (); + } + auto request (connection->get_prepared_json_request (request_string)); + boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) { + if (!ec) + { + boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) { + if (!ec) + { + if (connection->response.result () == boost::beast::http::status::ok) + { + this_l->success (connection->response.body (), connection->address); + } + else + { + this_l->node.logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ())); + this_l->failure (connection->address); + } + } + else if (ec == boost::system::errc::operation_canceled) + { + // The only case where we send a cancel is if we preempt stopped waiting for the response + this_l->cancel (connection); + this_l->failure (connection->address); + } + else + { + this_l->node.logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); + this_l->failure (connection->address); + } + }); + } + else + { + this_l->node.logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); + this_l->failure (connection->address); + } + }); + } + else + { + this_l->node.logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); + this_l->failure (connection->address); + } + }); + } + } +} + +void nano::distributed_work::cancel (std::shared_ptr connection) +{ + auto this_l (shared_from_this ()); + auto cancelling_l (std::make_shared (node.io_ctx, connection->address, connection->port)); + cancelling_l->socket.async_connect (nano::tcp_endpoint (cancelling_l->address, cancelling_l->port), [this_l, cancelling_l](boost::system::error_code const & ec) { + if (!ec) + { + std::string request_string; + { + boost::property_tree::ptree request; + request.put ("action", "work_cancel"); + request.put ("hash", this_l->root.to_string ()); + std::stringstream ostream; + boost::property_tree::write_json (ostream, request); + request_string = ostream.str (); + } + auto request (cancelling_l->get_prepared_json_request (request_string)); + boost::beast::http::async_write (cancelling_l->socket, *request, [this_l, request, cancelling_l](boost::system::error_code const & ec, size_t bytes_transferred) { + if (ec) + { + this_l->node.logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling_l->address % cancelling_l->port % ec.message () % ec.value ())); + } + }); + } + }); +} + +void nano::distributed_work::stop (bool const local_stop_a) +{ + if (!stopped.exchange (true)) + { + std::lock_guard lock (mutex); + if (local_stop_a && (node.config.work_threads != 0 || node.work.opencl)) + { + node.work.cancel (root); + } + for (auto & connection_w : connections) + { + if (auto connection_l = connection_w.lock ()) + { + boost::system::error_code ec; + connection_l->socket.cancel (ec); + if (ec) + { + node.logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.message () % ec.value ())); + } + try + { + connection_l->socket.close (); + } + catch (const boost::system::system_error & ec) + { + node.logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection_l->address % connection_l->port % ec.what () % ec.code ())); + } + } + } + connections.clear (); + outstanding.clear (); + } +} + +void nano::distributed_work::success (std::string const & body_a, boost::asio::ip::address const & address) +{ + auto last (remove (address)); + std::stringstream istream (body_a); + try + { + boost::property_tree::ptree result; + boost::property_tree::read_json (istream, result); + auto work_text (result.get ("work")); + uint64_t work; + if (!nano::from_string_hex (work_text, work)) + { + uint64_t result_difficulty (0); + if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty) + { + node.unresponsive_work_peers = false; + set_once (work); + stop (true); + } + else + { + node.logger.try_log (boost::str (boost::format ("Incorrect work response from %1% for root %2% with diffuculty %3%: %4%") % address % root.to_string () % nano::to_string_hex (difficulty) % work_text)); + handle_failure (last); + } + } + else + { + node.logger.try_log (boost::str (boost::format ("Work response from %1% wasn't a number: %2%") % address % work_text)); + handle_failure (last); + } + } + catch (...) + { + node.logger.try_log (boost::str (boost::format ("Work response from %1% wasn't parsable: %2%") % address % body_a)); + handle_failure (last); + } +} + +void nano::distributed_work::set_once (boost::optional work_a) +{ + if (!completed.exchange (true)) + { + callback (work_a); + } +} + +void nano::distributed_work::failure (boost::asio::ip::address const & address) +{ + auto last (remove (address)); + handle_failure (last); +} + +void nano::distributed_work::handle_failure (bool const last) +{ + if (last && !completed) + { + node.unresponsive_work_peers = true; + if (!local_generation_started) + { + if (stopped) + { + callback (boost::none); + } + else + { + if (backoff == 1 && node.config.logging.work_generation_time ()) + { + node.logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); + } + auto now (std::chrono::steady_clock::now ()); + auto root_l (root); + auto callback_l (callback); + std::weak_ptr node_w (node.shared ()); + auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); + // clang-format off + node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] { + if (auto node_l = node_w.lock ()) + { + node_l->distributed_work.make (next_backoff, root_l, callback_l, difficulty); + } + }); + // clang-format on + } + } + } +} + +bool nano::distributed_work::remove (boost::asio::ip::address const & address) +{ + std::lock_guard guard (mutex); + outstanding.erase (address); + return outstanding.empty (); +} + +nano::distributed_work_factory::distributed_work_factory (nano::node & node_a) : +node (node_a) +{ +} + +void nano::distributed_work_factory::make (nano::block_hash const & root_a, std::function)> const & callback_a, uint64_t difficulty_a) +{ + make (1, root_a, callback_a, difficulty_a); +} + +void nano::distributed_work_factory::make (unsigned int backoff_a, nano::block_hash const & root_a, std::function)> const & callback_a, uint64_t difficulty_a) +{ + cleanup_finished (); + auto distributed (std::make_shared (backoff_a, node, root_a, callback_a, difficulty_a)); + { + std::lock_guard guard (mutex); + work[root_a].emplace_back (distributed); + } + distributed->start (); +} + +void nano::distributed_work_factory::cancel (nano::block_hash const & root_a, bool const local_stop) +{ + { + std::lock_guard guard (mutex); + auto existing_l (work.find (root_a)); + if (existing_l != work.end ()) + { + for (auto & distributed_w : existing_l->second) + { + if (auto distributed_l = distributed_w.lock ()) + { + // Send work_cancel to work peers + // Cancels local generation if local_stop is true, but usually should be done by the work pool + distributed_l->stop (local_stop); + } + } + work.erase (existing_l); + } + } +} + +void nano::distributed_work_factory::cleanup_finished () +{ + std::lock_guard guard (mutex); + for (auto it (work.begin ()), end (work.end ()); it != end;) + { + it->second.erase (std::remove_if (it->second.begin (), it->second.end (), [](auto distributed_a) { + return distributed_a.expired (); + }), + it->second.end ()); + + if (it->second.empty ()) + { + it = work.erase (it); + } + else + { + ++it; + } + } +} diff --git a/nano/node/distributed_work.hpp b/nano/node/distributed_work.hpp new file mode 100644 index 00000000..edc7c94d --- /dev/null +++ b/nano/node/distributed_work.hpp @@ -0,0 +1,79 @@ +#pragma once + +#include +#include +#include + +#include + +#include + +using request_type = boost::beast::http::request; + +namespace nano +{ +class node; + +class work_peer_request final +{ +public: + work_peer_request (boost::asio::io_context & io_ctx_a, boost::asio::ip::address address_a, uint16_t port_a) : + address (address_a), + port (port_a), + socket (io_ctx_a) + { + } + std::shared_ptr get_prepared_json_request (std::string const &) const; + boost::asio::ip::address address; + uint16_t port; + boost::beast::flat_buffer buffer; + boost::beast::http::response response; + boost::asio::ip::tcp::socket socket; +}; + +/** + * distributed_work cancels local and peer work requests when going out of scope + */ +class distributed_work final : public std::enable_shared_from_this +{ +public: + distributed_work (unsigned int, nano::node &, nano::block_hash const &, std::function)> const &, uint64_t); + ~distributed_work (); + void start (); + void start_work (); + void cancel (std::shared_ptr); + void stop (bool const); + void success (std::string const &, boost::asio::ip::address const &); + void set_once (boost::optional); + void failure (boost::asio::ip::address const &); + void handle_failure (bool const); + bool remove (boost::asio::ip::address const &); + + std::function)> callback; + unsigned int backoff; // in seconds + nano::node & node; + nano::block_hash root; + std::mutex mutex; + std::map outstanding; + std::vector> connections; + std::vector> need_resolve; + uint64_t difficulty; + std::atomic completed{ false }; + std::atomic local_generation_started{ false }; + std::atomic stopped{ false }; +}; + +class distributed_work_factory final +{ +public: + distributed_work_factory (nano::node &); + void make (nano::block_hash const &, std::function)> const &, uint64_t); + void make (unsigned int, nano::block_hash const &, std::function)> const &, uint64_t); + void cancel (nano::block_hash const &, bool const local_stop = false); + void cleanup_finished (); + + std::unordered_map>> work; + std::mutex mutex; + nano::node & node; +}; +} \ No newline at end of file diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 3a011fc1..69faddf1 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -4490,7 +4490,7 @@ void nano::json_handler::work_cancel () auto hash (hash_impl ()); if (!ec) { - node.work.cancel (hash); + node.observers.work_cancel.notify (hash); } response_errors (); } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 0d8d287e..412bf06c 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -121,6 +121,7 @@ stats (config.stat_config), flags (flags_a), alarm (alarm_a), work (work_a), +distributed_work (*this), logger (config_a.logging.min_time_between_log_output), store_impl (nano::make_store (logger, application_path_a, flags.read_only, true, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_max_dbs, !flags.disable_unchecked_drop, flags.sideband_batch_size, config_a.backup_before_upgrade)), store (*store_impl), @@ -346,7 +347,7 @@ startup_time (std::chrono::steady_clock::now ()) } } }); - if (this->websocket_server) + if (websocket_server) { observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr vote_a, std::shared_ptr channel_a) { if (this->websocket_server->any_subscriber (nano::websocket::topic::vote)) @@ -357,6 +358,11 @@ startup_time (std::chrono::steady_clock::now ()) } }); } + // Cancelling local work generation + observers.work_cancel.add ([this](nano::block_hash const & root_a) { + this->work.cancel (root_a); + this->distributed_work.cancel (root_a); + }); logger.always_log ("Node starting, version: ", NANO_VERSION_STRING); logger.always_log ("Build information: ", BUILD_INFO); @@ -943,345 +949,6 @@ int nano::node::price (nano::uint128_t const & balance_a, int amount_a) return static_cast (result * 100.0); } -namespace -{ -class work_request -{ -public: - work_request (boost::asio::io_context & io_ctx_a, boost::asio::ip::address address_a, uint16_t port_a) : - address (address_a), - port (port_a), - socket (io_ctx_a) - { - } - boost::asio::ip::address address; - uint16_t port; - boost::beast::flat_buffer buffer; - boost::beast::http::response response; - boost::asio::ip::tcp::socket socket; -}; -class distributed_work : public std::enable_shared_from_this -{ -public: - distributed_work (std::shared_ptr const & node_a, nano::block_hash const & root_a, std::function const & callback_a, uint64_t difficulty_a) : - distributed_work (1, node_a, root_a, callback_a, difficulty_a) - { - assert (node_a != nullptr); - } - ~distributed_work () - { - stop (true); - } - distributed_work (unsigned int backoff_a, std::shared_ptr const & node_a, nano::block_hash const & root_a, std::function const & callback_a, uint64_t difficulty_a) : - callback (callback_a), - backoff (backoff_a), - node (node_a), - root (root_a), - need_resolve (node_a->config.work_peers), - difficulty (difficulty_a) - { - assert (node_a != nullptr); - assert (!completed); - } - void start () - { - if (need_resolve.empty ()) - { - start_work (); - } - else - { - auto current (need_resolve.back ()); - need_resolve.pop_back (); - auto this_l (shared_from_this ()); - boost::system::error_code ec; - auto parsed_address (boost::asio::ip::address_v6::from_string (current.first, ec)); - if (!ec) - { - outstanding[parsed_address] = current.second; - start (); - } - else - { - node->network.resolver.async_resolve (boost::asio::ip::udp::resolver::query (current.first, std::to_string (current.second)), [current, this_l](boost::system::error_code const & ec, boost::asio::ip::udp::resolver::iterator i_a) { - if (!ec) - { - for (auto i (i_a), n (boost::asio::ip::udp::resolver::iterator{}); i != n; ++i) - { - auto endpoint (i->endpoint ()); - this_l->outstanding[endpoint.address ()] = endpoint.port (); - } - } - else - { - this_l->node->logger.try_log (boost::str (boost::format ("Error resolving work peer: %1%:%2%: %3%") % current.first % current.second % ec.message ())); - } - this_l->start (); - }); - } - } - } - void start_work () - { - auto this_l (shared_from_this ()); - - // Start work generation if peers are not acting correctly, or if there are no peers configured - if ((outstanding.empty () || node->unresponsive_work_peers) && (node->config.work_threads != 0 || node->work.opencl)) - { - local_generation_started = true; - node->work.generate ( - this_l->root, [this_l](boost::optional const & work_a) { - if (work_a) - { - this_l->set_once (work_a.value ()); - this_l->stop (false); - } - }, - difficulty); - } - - if (!outstanding.empty ()) - { - std::lock_guard guard (mutex); - for (auto const & i : outstanding) - { - auto host (i.first); - auto service (i.second); - auto connection (std::make_shared (this_l->node->io_ctx, host, service)); - connections.push_back (connection); - connection->socket.async_connect (nano::tcp_endpoint (host, service), [this_l, connection](boost::system::error_code const & ec) { - if (!ec) - { - std::string request_string; - { - boost::property_tree::ptree request; - request.put ("action", "work_generate"); - request.put ("hash", this_l->root.to_string ()); - request.put ("difficulty", nano::to_string_hex (this_l->difficulty)); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - request_string = ostream.str (); - } - auto request (std::make_shared> ()); - request->method (boost::beast::http::verb::post); - request->set (boost::beast::http::field::content_type, "application/json"); - request->target ("/"); - request->version (11); - request->body () = request_string; - request->prepare_payload (); - boost::beast::http::async_write (connection->socket, *request, [this_l, connection, request](boost::system::error_code const & ec, size_t bytes_transferred) { - if (!ec) - { - boost::beast::http::async_read (connection->socket, connection->buffer, connection->response, [this_l, connection](boost::system::error_code const & ec, size_t bytes_transferred) { - if (!ec) - { - if (connection->response.result () == boost::beast::http::status::ok) - { - this_l->success (connection->response.body (), connection->address); - } - else - { - this_l->node->logger.try_log (boost::str (boost::format ("Work peer responded with an error %1% %2%: %3%") % connection->address % connection->port % connection->response.result ())); - this_l->failure (connection->address); - } - } - else if (ec == boost::system::errc::operation_canceled) - { - // The only case where we send a cancel is if we preempt stopped waiting for the response - this_l->cancel (connection); - this_l->failure (connection->address); - } - else - { - this_l->node->logger.try_log (boost::str (boost::format ("Unable to read from work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); - this_l->failure (connection->address); - } - }); - } - else - { - this_l->node->logger.try_log (boost::str (boost::format ("Unable to write to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); - this_l->failure (connection->address); - } - }); - } - else - { - this_l->node->logger.try_log (boost::str (boost::format ("Unable to connect to work_peer %1% %2%: %3% (%4%)") % connection->address % connection->port % ec.message () % ec.value ())); - this_l->failure (connection->address); - } - }); - } - } - } - void cancel (std::shared_ptr connection) - { - auto this_l (shared_from_this ()); - auto cancelling (std::make_shared (node->io_ctx, connection->address, connection->port)); - cancelling->socket.async_connect (nano::tcp_endpoint (cancelling->address, cancelling->port), [this_l, cancelling](boost::system::error_code const & ec) { - if (!ec) - { - std::string request_string; - { - boost::property_tree::ptree request; - request.put ("action", "work_cancel"); - request.put ("hash", this_l->root.to_string ()); - std::stringstream ostream; - boost::property_tree::write_json (ostream, request); - request_string = ostream.str (); - } - auto request (std::make_shared> ()); - request->method (boost::beast::http::verb::post); - request->set (boost::beast::http::field::content_type, "application/json"); - request->target ("/"); - request->version (11); - request->body () = request_string; - request->prepare_payload (); - - boost::beast::http::async_write (cancelling->socket, *request, [this_l, request, cancelling](boost::system::error_code const & ec, size_t bytes_transferred) { - if (ec) - { - this_l->node->logger.try_log (boost::str (boost::format ("Unable to send work_cancel to work_peer %1% %2%: %3% (%4%)") % cancelling->address % cancelling->port % ec.message () % ec.value ())); - } - }); - } - }); - } - void stop (bool const local_stop) - { - if (!stopped.exchange (true)) - { - std::lock_guard lock (mutex); - if (local_stop && (node->config.work_threads != 0 || node->work.opencl)) - { - node->work.cancel (root); - } - for (auto & i : connections) - { - auto connection = i.lock (); - if (connection) - { - boost::system::error_code ec; - connection->socket.cancel (ec); - if (ec) - { - node->logger.try_log (boost::str (boost::format ("Error cancelling operation with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.message () % ec.value ())); - } - try - { - connection->socket.close (); - } - catch (const boost::system::system_error & ec) - { - node->logger.try_log (boost::str (boost::format ("Error closing socket with work_peer %1% %2%: %3%") % connection->address % connection->port % ec.what () % ec.code ())); - } - } - } - connections.clear (); - outstanding.clear (); - } - } - void success (std::string const & body_a, boost::asio::ip::address const & address) - { - auto last (remove (address)); - std::stringstream istream (body_a); - try - { - boost::property_tree::ptree result; - boost::property_tree::read_json (istream, result); - auto work_text (result.get ("work")); - uint64_t work; - if (!nano::from_string_hex (work_text, work)) - { - uint64_t result_difficulty (0); - if (!nano::work_validate (root, work, &result_difficulty) && result_difficulty >= difficulty) - { - node->unresponsive_work_peers = false; - set_once (work); - stop (true); - } - else - { - node->logger.try_log (boost::str (boost::format ("Incorrect work response from %1% for root %2% with diffuculty %3%: %4%") % address % root.to_string () % nano::to_string_hex (difficulty) % work_text)); - handle_failure (last); - } - } - else - { - node->logger.try_log (boost::str (boost::format ("Work response from %1% wasn't a number: %2%") % address % work_text)); - handle_failure (last); - } - } - catch (...) - { - node->logger.try_log (boost::str (boost::format ("Work response from %1% wasn't parsable: %2%") % address % body_a)); - handle_failure (last); - } - } - void set_once (uint64_t work_a) - { - if (!completed.exchange (true)) - { - callback (work_a); - } - } - void failure (boost::asio::ip::address const & address) - { - auto last (remove (address)); - handle_failure (last); - } - void handle_failure (bool last) - { - if (last) - { - if (!completed) - { - node->unresponsive_work_peers = true; - if (!local_generation_started) - { - if (backoff == 1 && node->config.logging.work_generation_time ()) - { - node->logger.always_log ("Work peer(s) failed to generate work for root ", root.to_string (), ", retrying..."); - } - auto now (std::chrono::steady_clock::now ()); - auto root_l (root); - auto callback_l (callback); - std::weak_ptr node_w (node); - auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); - // clang-format off - node->alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l, callback_l, next_backoff, difficulty = difficulty ] { - if (auto node_l = node_w.lock ()) - { - auto work_generation (std::make_shared (next_backoff, node_l, root_l, callback_l, difficulty)); - work_generation->start (); - } - }); - // clang-format on - } - } - } - } - bool remove (boost::asio::ip::address const & address) - { - std::lock_guard lock (mutex); - outstanding.erase (address); - return outstanding.empty (); - } - std::function callback; - unsigned int backoff; // in seconds - std::shared_ptr node; - nano::block_hash root; - std::mutex mutex; - std::map outstanding; - std::vector> connections; - std::vector> need_resolve; - uint64_t difficulty; - std::atomic completed{ false }; - std::atomic local_generation_started{ false }; - std::atomic stopped{ false }; -}; -} - void nano::node::work_generate_blocking (nano::block & block_a) { work_generate_blocking (block_a, network_params.network.publish_threshold); @@ -1292,15 +959,14 @@ void nano::node::work_generate_blocking (nano::block & block_a, uint64_t difficu block_a.block_work_set (work_generate_blocking (block_a.root (), difficulty_a)); } -void nano::node::work_generate (nano::uint256_union const & hash_a, std::function callback_a) +void nano::node::work_generate (nano::uint256_union const & hash_a, std::function)> callback_a) { work_generate (hash_a, callback_a, network_params.network.publish_threshold); } -void nano::node::work_generate (nano::uint256_union const & hash_a, std::function callback_a, uint64_t difficulty_a) +void nano::node::work_generate (nano::uint256_union const & hash_a, std::function)> callback_a, uint64_t difficulty_a) { - auto work_generation (std::make_shared (shared (), hash_a, callback_a, difficulty_a)); - work_generation->start (); + distributed_work.make (hash_a, callback_a, difficulty_a); } uint64_t nano::node::work_generate_blocking (nano::uint256_union const & block_a) @@ -1313,8 +979,8 @@ uint64_t nano::node::work_generate_blocking (nano::uint256_union const & hash_a, std::promise promise; std::future future = promise.get_future (); // clang-format off - work_generate (hash_a, [&promise](uint64_t work_a) { - promise.set_value (work_a); + work_generate (hash_a, [&promise](boost::optional work_a) { + promise.set_value (work_a.value_or (0)); }, difficulty_a); // clang-format on diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 56d2a228..79847422 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -122,8 +123,8 @@ public: void work_generate_blocking (nano::block &); uint64_t work_generate_blocking (nano::uint256_union const &, uint64_t); uint64_t work_generate_blocking (nano::uint256_union const &); - void work_generate (nano::uint256_union const &, std::function, uint64_t); - void work_generate (nano::uint256_union const &, std::function); + void work_generate (nano::uint256_union const &, std::function)>, uint64_t); + void work_generate (nano::uint256_union const &, std::function)>); void add_initial_peers (); void block_confirm (std::shared_ptr); bool block_confirmed_or_being_confirmed (nano::transaction const &, nano::block_hash const &); @@ -146,6 +147,7 @@ public: nano::node_flags flags; nano::alarm & alarm; nano::work_pool & work; + nano::distributed_work_factory distributed_work; nano::logger_mt logger; std::unique_ptr store_impl; nano::block_store & store; diff --git a/nano/node/node_observers.cpp b/nano/node/node_observers.cpp index 3c78812a..e4d074b9 100644 --- a/nano/node/node_observers.cpp +++ b/nano/node/node_observers.cpp @@ -10,5 +10,6 @@ std::unique_ptr nano::collect_seq_con_info (nano:: composite->add_component (collect_seq_con_info (node_observers.account_balance, "account_balance")); composite->add_component (collect_seq_con_info (node_observers.endpoint, "endpoint")); composite->add_component (collect_seq_con_info (node_observers.disconnect, "disconnect")); + composite->add_component (collect_seq_con_info (node_observers.work_cancel, "work_cancel")); return composite; } diff --git a/nano/node/node_observers.hpp b/nano/node/node_observers.hpp index 33e6c533..ab9a8a07 100644 --- a/nano/node/node_observers.hpp +++ b/nano/node/node_observers.hpp @@ -21,6 +21,7 @@ public: nano::observer_set> endpoint; nano::observer_set<> disconnect; nano::observer_set difficulty; + nano::observer_set work_cancel; }; std::unique_ptr collect_seq_con_info (node_observers & node_observers, const std::string & name); diff --git a/nano/node/rocksdb/rocksdb_txn.hpp b/nano/node/rocksdb/rocksdb_txn.hpp index f642ee56..ce48b2f0 100644 --- a/nano/node/rocksdb/rocksdb_txn.hpp +++ b/nano/node/rocksdb/rocksdb_txn.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include @@ -7,8 +9,6 @@ #include #include -#include - namespace nano { class read_rocksdb_txn final : public read_transaction_impl diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index 221685c9..25443adb 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -1508,7 +1508,7 @@ void nano::work_watcher::remove (std::shared_ptr block_a) if (existing != watched.end () && existing->second->hash () == block_a->hash ()) { watched.erase (existing); - node.work.cancel (block_a->root ()); + node.observers.work_cancel.notify (block_a->root ()); } } diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 86720061..7db38413 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -2842,8 +2842,9 @@ TEST (rpc, work_peer_bad) node2.config.work_peers.push_back (std::make_pair (boost::asio::ip::address_v6::any ().to_string (), 0)); nano::block_hash hash1 (1); std::atomic work (0); - node2.work_generate (hash1, [&work](uint64_t work_a) { - work = work_a; + node2.work_generate (hash1, [&work](boost::optional work_a) { + ASSERT_TRUE (work_a.is_initialized ()); + work = *work_a; }); system.deadline_set (5s); while (nano::work_validate (hash1, work)) @@ -2871,8 +2872,9 @@ TEST (rpc, work_peer_one) node2.config.work_peers.push_back (std::make_pair (node1.network.endpoint ().address ().to_string (), rpc.config.port)); nano::keypair key1; uint64_t work (0); - node2.work_generate (key1.pub, [&work](uint64_t work_a) { - work = work_a; + node2.work_generate (key1.pub, [&work](boost::optional work_a) { + ASSERT_TRUE (work_a.is_initialized ()); + work = *work_a; }); system.deadline_set (5s); while (nano::work_validate (key1.pub, work)) @@ -2923,8 +2925,9 @@ TEST (rpc, work_peer_many) { nano::keypair key1; uint64_t work (0); - node1.work_generate (key1.pub, [&work](uint64_t work_a) { - work = work_a; + node1.work_generate (key1.pub, [&work](boost::optional work_a) { + ASSERT_TRUE (work_a.is_initialized ()); + work = *work_a; }); while (nano::work_validate (key1.pub, work)) {