From 2d4a1ff72d9a8cc25d61a87293087bd3a4856e5c Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Thu, 10 Oct 2019 16:58:30 +0100 Subject: [PATCH] Secondary list of peers for generating work (#2330) * Secondary list of peers for distributed_work This secondary list of peers can be used only via RPC work generate by setting the option "new_work_peers" to "true" along with "use_peers" - Adds new_work_peers config, which works similarly to the current work_peers config - Changes distributed_work to receive a list of peers - Some fixes to distributed_work where it wouldn't call the failure callback in very rare scenarios - Collect object stats for distributed_work * Extract node_config::deserialize_address (const) * Fix function signature * Rename to secondary_work_peers, place under [node.experimental] and use 127.0.0.1:8076 as default * Fix tests --- nano/core_test/distributed_work.cpp | 38 ++++++++++---------- nano/core_test/toml.cpp | 5 +++ nano/core_test/wallet.cpp | 2 +- nano/node/distributed_work.cpp | 55 +++++++++++++++++++++-------- nano/node/distributed_work.hpp | 14 +++++--- nano/node/json_handler.cpp | 8 +++-- nano/node/node.cpp | 13 +++++-- nano/node/node.hpp | 3 +- nano/node/nodeconfig.cpp | 55 +++++++++++++++++++++-------- nano/node/nodeconfig.hpp | 5 ++- 10 files changed, 137 insertions(+), 61 deletions(-) diff --git a/nano/core_test/distributed_work.cpp b/nano/core_test/distributed_work.cpp index c4bea79f..e05567fb 100644 --- a/nano/core_test/distributed_work.cpp +++ b/nano/core_test/distributed_work.cpp @@ -17,7 +17,7 @@ TEST (distributed_work, no_peers) work = work_a; done = true; }; - node->distributed_work.make (hash, callback, node->network_params.network.publish_threshold, nano::account ()); + node->distributed_work.make (hash, node->config.work_peers, callback, node->network_params.network.publish_threshold, nano::account ()); system.deadline_set (5s); while (!done) { @@ -25,8 +25,8 @@ TEST (distributed_work, no_peers) } ASSERT_FALSE (nano::work_validate (hash, *work)); // should only be removed after cleanup - ASSERT_EQ (1, node->distributed_work.work.size ()); - while (!node->distributed_work.work.empty ()) + ASSERT_EQ (1, node->distributed_work.items.size ()); + while (!node->distributed_work.items.empty ()) { node->distributed_work.cleanup_finished (); ASSERT_NO_ERROR (system.poll ()); @@ -44,7 +44,7 @@ TEST (distributed_work, no_peers_disabled) ASSERT_FALSE (work_a.is_initialized ()); done = true; }; - node.distributed_work.make (nano::block_hash (), callback_failure, nano::network_constants::publish_test_threshold); + node.distributed_work.make (nano::block_hash (), node.config.work_peers, callback_failure, nano::network_constants::publish_test_threshold); while (!done) { ASSERT_NO_ERROR (system.poll ()); @@ -64,27 +64,27 @@ TEST (distributed_work, no_peers_cancel) ASSERT_FALSE (work_a.is_initialized ()); done = true; }; - node.distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold)); - ASSERT_EQ (1, node.distributed_work.work.size ()); + node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold)); + ASSERT_EQ (1, node.distributed_work.items.size ()); // cleanup should not cancel or remove an ongoing work node.distributed_work.cleanup_finished (); - ASSERT_EQ (1, node.distributed_work.work.size ()); + ASSERT_EQ (1, node.distributed_work.items.size ()); // manually cancel node.distributed_work.cancel (hash, true); // forces local stop system.deadline_set (20s); - while (!done && !node.distributed_work.work.empty ()) + while (!done && !node.distributed_work.items.empty ()) { ASSERT_NO_ERROR (system.poll ()); } // now using observer done = false; - node.distributed_work.make (hash, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold)); - ASSERT_EQ (1, node.distributed_work.work.size ()); + node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold)); + ASSERT_EQ (1, node.distributed_work.items.size ()); node.observers.work_cancel.notify (hash); system.deadline_set (20s); - while (!done && !node.distributed_work.work.empty ()) + while (!done && !node.distributed_work.items.empty ()) { ASSERT_NO_ERROR (system.poll ()); } @@ -104,12 +104,12 @@ TEST (distributed_work, no_peers_multi) // Test many works for the same root for (unsigned i{ 0 }; i < total; ++i) { - node->distributed_work.make (hash, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold)); + node->distributed_work.make (hash, node->config.work_peers, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold)); } // 1 root, and _total_ requests for that root are expected, but some may have already finished - ASSERT_EQ (1, node->distributed_work.work.size ()); + ASSERT_EQ (1, node->distributed_work.items.size ()); { - auto requests (node->distributed_work.work.begin ()); + auto requests (node->distributed_work.items.begin ()); ASSERT_EQ (hash, requests->first); ASSERT_GE (requests->second.size (), total - 4); } @@ -119,7 +119,7 @@ TEST (distributed_work, no_peers_multi) ASSERT_NO_ERROR (system.poll ()); } system.deadline_set (5s); - while (node->distributed_work.work.empty ()) + while (node->distributed_work.items.empty ()) { node->distributed_work.cleanup_finished (); ASSERT_NO_ERROR (system.poll ()); @@ -129,11 +129,11 @@ TEST (distributed_work, no_peers_multi) for (unsigned i{ 0 }; i < total; ++i) { nano::block_hash hash_i (i + 1); - node->distributed_work.make (hash_i, callback, node->network_params.network.publish_threshold); + node->distributed_work.make (hash_i, node->config.work_peers, callback, node->network_params.network.publish_threshold); } // 10 roots expected with 1 work each, but some may have completed so test for some - ASSERT_GT (node->distributed_work.work.size (), 5); - for (auto & requests : node->distributed_work.work) + ASSERT_GT (node->distributed_work.items.size (), 5); + for (auto & requests : node->distributed_work.items) { ASSERT_EQ (1, requests.second.size ()); } @@ -143,7 +143,7 @@ TEST (distributed_work, no_peers_multi) ASSERT_NO_ERROR (system.poll ()); } system.deadline_set (5s); - while (node->distributed_work.work.empty ()) + while (node->distributed_work.items.empty ()) { node->distributed_work.cleanup_finished (); ASSERT_NO_ERROR (system.poll ()); diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index a975e731..e403d0b4 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -160,6 +160,7 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.lmdb_max_dbs, defaults.node.lmdb_max_dbs); ASSERT_EQ (conf.node.max_work_generate_multiplier, defaults.node.max_work_generate_multiplier); ASSERT_EQ (conf.node.network_threads, defaults.node.network_threads); + ASSERT_EQ (conf.node.secondary_work_peers, defaults.node.secondary_work_peers); ASSERT_EQ (conf.node.work_watcher_period, defaults.node.work_watcher_period); ASSERT_EQ (conf.node.online_weight_minimum, defaults.node.online_weight_minimum); ASSERT_EQ (conf.node.online_weight_quorum, defaults.node.online_weight_quorum); @@ -492,6 +493,9 @@ TEST (toml, daemon_config_deserialize_no_defaults) num_memtables = 3 total_memtable_size = 0 + [node.experimental] + secondary_work_peers = ["test.org:998"] + [opencl] device = 999 enable = true @@ -542,6 +546,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.max_work_generate_multiplier, defaults.node.max_work_generate_multiplier); ASSERT_NE (conf.node.frontiers_confirmation, defaults.node.frontiers_confirmation); ASSERT_NE (conf.node.network_threads, defaults.node.network_threads); + ASSERT_NE (conf.node.secondary_work_peers, defaults.node.secondary_work_peers); ASSERT_NE (conf.node.work_watcher_period, defaults.node.work_watcher_period); ASSERT_NE (conf.node.online_weight_minimum, defaults.node.online_weight_minimum); ASSERT_NE (conf.node.online_weight_quorum, defaults.node.online_weight_quorum); diff --git a/nano/core_test/wallet.cpp b/nano/core_test/wallet.cpp index e80fb396..414b6e72 100644 --- a/nano/core_test/wallet.cpp +++ b/nano/core_test/wallet.cpp @@ -1214,7 +1214,7 @@ TEST (wallet, work_watcher_generation_disabled) updated_difficulty = existing->difficulty; } ASSERT_EQ (updated_difficulty, difficulty); - ASSERT_TRUE (node.distributed_work.work.empty ()); + ASSERT_TRUE (node.distributed_work.items.empty ()); } TEST (wallet, work_watcher_removed) diff --git a/nano/node/distributed_work.cpp b/nano/node/distributed_work.cpp index dd626f36..cf159812 100644 --- a/nano/node/distributed_work.cpp +++ b/nano/node/distributed_work.cpp @@ -14,13 +14,14 @@ std::shared_ptr nano::work_peer_request::get_prepared_json_request return request; } -nano::distributed_work::distributed_work (unsigned int backoff_a, nano::node & node_a, nano::root const & root_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) : +nano::distributed_work::distributed_work (nano::node & node_a, nano::root const & root_a, std::vector> const & peers_a, unsigned int backoff_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) : callback (callback_a), backoff (backoff_a), node (node_a), root (root_a), account (account_a), -need_resolve (node.config.work_peers), +peers (peers_a), +need_resolve (peers_a), difficulty (difficulty_a), elapsed (nano::timer_state::started, "distributed work generation timer") { @@ -185,6 +186,11 @@ void nano::distributed_work::start_work () }); } } + + if (!local_generation_started && outstanding.empty ()) + { + callback (boost::none); + } } void nano::distributed_work::cancel_connection (std::shared_ptr connection_a) @@ -341,10 +347,14 @@ void nano::distributed_work::handle_failure (bool const last_a) std::weak_ptr node_w (node.shared ()); auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5)); // clang-format off - node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l = root, callback_l = callback, next_backoff, difficulty = difficulty, account_l = account ] { + node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l = root, peers_l = peers, callback_l = callback, next_backoff, difficulty = difficulty, account_l = account ] { if (auto node_l = node_w.lock ()) { - node_l->distributed_work.make (next_backoff, root_l, callback_l, difficulty, account_l); + node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l); + } + else if (callback_l) + { + callback_l (boost::none); } }); // clang-format on @@ -374,20 +384,20 @@ node (node_a) { } -void nano::distributed_work_factory::make (nano::root const & root_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) +void nano::distributed_work_factory::make (nano::root const & root_a, std::vector> const & peers_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) { - make (1, root_a, callback_a, difficulty_a, account_a); + make (1, root_a, peers_a, callback_a, difficulty_a, account_a); } -void nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) +void nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector> const & peers_a, std::function)> const & callback_a, uint64_t difficulty_a, boost::optional const & account_a) { cleanup_finished (); if (node.work_generation_enabled ()) { - auto distributed (std::make_shared (backoff_a, node, root_a, callback_a, difficulty_a, account_a)); + auto distributed (std::make_shared (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a)); { nano::lock_guard guard (mutex); - work[root_a].emplace_back (distributed); + items[root_a].emplace_back (distributed); } distributed->start (); } @@ -401,8 +411,8 @@ void nano::distributed_work_factory::cancel (nano::root const & root_a, bool con { { nano::lock_guard guard (mutex); - auto existing_l (work.find (root_a)); - if (existing_l != work.end ()) + auto existing_l (items.find (root_a)); + if (existing_l != items.end ()) { for (auto & distributed_w : existing_l->second) { @@ -412,7 +422,7 @@ void nano::distributed_work_factory::cancel (nano::root const & root_a, bool con distributed_l->cancel_once (); } } - work.erase (existing_l); + items.erase (existing_l); } } } @@ -420,7 +430,7 @@ void nano::distributed_work_factory::cancel (nano::root const & root_a, bool con void nano::distributed_work_factory::cleanup_finished () { nano::lock_guard guard (mutex); - for (auto it (work.begin ()), end (work.end ()); it != end;) + for (auto it (items.begin ()), end (items.end ()); it != end;) { it->second.erase (std::remove_if (it->second.begin (), it->second.end (), [](auto distributed_a) { return distributed_a.expired (); @@ -429,7 +439,7 @@ void nano::distributed_work_factory::cleanup_finished () if (it->second.empty ()) { - it = work.erase (it); + it = items.erase (it); } else { @@ -437,3 +447,20 @@ void nano::distributed_work_factory::cleanup_finished () } } } + +namespace nano +{ +std::unique_ptr collect_seq_con_info (distributed_work_factory & distributed_work, const std::string & name) +{ + size_t item_count = 0; + { + nano::lock_guard guard (distributed_work.mutex); + item_count = distributed_work.items.size (); + } + + auto composite = std::make_unique (name); + auto sizeof_item_element = sizeof (decltype (distributed_work.items)::value_type); + composite->add_component (std::make_unique (seq_con_info{ "items", item_count, sizeof_item_element })); + return composite; +} +} \ No newline at end of file diff --git a/nano/node/distributed_work.hpp b/nano/node/distributed_work.hpp index e3d932d8..4943d26a 100644 --- a/nano/node/distributed_work.hpp +++ b/nano/node/distributed_work.hpp @@ -38,7 +38,7 @@ public: class distributed_work final : public std::enable_shared_from_this { public: - distributed_work (unsigned int, nano::node &, nano::root const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); + distributed_work (nano::node &, nano::root const &, std::vector> const & peers_a, unsigned int, std::function)> const &, uint64_t, boost::optional const & = boost::none); ~distributed_work (); void start (); void start_work (); @@ -60,6 +60,7 @@ public: std::mutex mutex; std::map outstanding; std::vector> connections; + std::vector> const peers; std::vector> need_resolve; uint64_t difficulty; uint64_t work_result{ 0 }; @@ -76,13 +77,16 @@ class distributed_work_factory final { public: distributed_work_factory (nano::node &); - void make (nano::root const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); - void make (unsigned int, nano::root const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); + void make (nano::root const &, std::vector> const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); + void make (unsigned int, nano::root const &, std::vector> const &, std::function)> const &, uint64_t, boost::optional const & = boost::none); void cancel (nano::root const &, bool const local_stop = false); void cleanup_finished (); - std::unordered_map>> work; - std::mutex mutex; nano::node & node; + std::unordered_map>> items; + std::mutex mutex; }; + +class seq_con_info_component; +std::unique_ptr collect_seq_con_info (distributed_work_factory & distributed_work, const std::string & name); } \ No newline at end of file diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 5f9bb22d..1b51bf1d 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -4530,7 +4530,7 @@ void nano::json_handler::work_generate () } if (!ec) { - bool use_peers (request.get_optional ("use_peers") == true); + auto use_peers (request.get ("use_peers", false)); auto rpc_l (shared_from_this ()); auto callback = [rpc_l, hash, this](boost::optional const & work_a) { if (work_a) @@ -4566,9 +4566,11 @@ void nano::json_handler::work_generate () } else { - if (node.work_generation_enabled ()) + auto secondary_work_peers_l (request.get ("secondary_work_peers", false)); + auto const & peers_l (secondary_work_peers_l ? node.config.secondary_work_peers : node.config.work_peers); + if (node.work_generation_enabled (peers_l)) { - node.work_generate (hash, callback, difficulty, account); + node.work_generate (hash, callback, difficulty, account, secondary_work_peers_l); } else { diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 662f7a7a..ac1f74c1 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -612,6 +612,7 @@ std::unique_ptr collect_seq_con_info (node & node, const composite->add_component (collect_seq_con_info (node.confirmation_height_processor, "confirmation_height_processor")); composite->add_component (collect_seq_con_info (node.pending_confirmation_height, "pending_confirmation_height")); composite->add_component (collect_seq_con_info (node.worker, "worker")); + composite->add_component (collect_seq_con_info (node.distributed_work, "distributed_work")); return composite; } } @@ -967,7 +968,12 @@ bool nano::node::local_work_generation_enabled () const bool nano::node::work_generation_enabled () const { - return !config.work_peers.empty () || local_work_generation_enabled (); + return work_generation_enabled (config.work_peers); +} + +bool nano::node::work_generation_enabled (std::vector> const & peers_a) const +{ + return !peers_a.empty () || local_work_generation_enabled (); } boost::optional nano::node::work_generate_blocking (nano::block & block_a) @@ -990,9 +996,10 @@ void nano::node::work_generate (nano::root const & root_a, std::function)> callback_a, uint64_t difficulty_a, boost::optional const & account_a) +void nano::node::work_generate (nano::root const & root_a, std::function)> callback_a, uint64_t difficulty_a, boost::optional const & account_a, bool secondary_work_peers_a) { - distributed_work.make (root_a, callback_a, difficulty_a, account_a); + auto const & peers_l (secondary_work_peers_a ? config.secondary_work_peers : config.work_peers); + distributed_work.make (root_a, peers_l, callback_a, difficulty_a, account_a); } boost::optional nano::node::work_generate_blocking (nano::root const & root_a, boost::optional const & account_a) diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 4e42db04..983394ab 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -125,11 +125,12 @@ public: int price (nano::uint128_t const &, int); bool local_work_generation_enabled () const; bool work_generation_enabled () const; + bool work_generation_enabled (std::vector> const &) const; boost::optional work_generate_blocking (nano::block &, uint64_t); boost::optional work_generate_blocking (nano::block &); boost::optional work_generate_blocking (nano::root const &, uint64_t, boost::optional const & = boost::none); boost::optional work_generate_blocking (nano::root const &, boost::optional const & = boost::none); - void work_generate (nano::root const &, std::function)>, uint64_t, boost::optional const & = boost::none); + void work_generate (nano::root const &, std::function)>, uint64_t, boost::optional const & = boost::none, bool const = false); void work_generate (nano::root const &, std::function)>, boost::optional const & = boost::none); void add_initial_peers (); void block_confirm (std::shared_ptr); diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 68c62483..6fec0a23 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -116,6 +116,16 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const { preconfigured_representatives_l->push_back (i->to_account ()); } + + /** Experimental node entries */ + nano::tomlconfig experimental_l; + auto secondary_work_peers_l (experimental_l.create_array ("secondary_work_peers", "A list of \"address:port\" entries to identify work peers for secondary work generation")); + for (auto i (secondary_work_peers.begin ()), n (secondary_work_peers.end ()); i != n; ++i) + { + secondary_work_peers_l->push_back (boost::str (boost::format ("%1%:%2%") % i->first % i->second)); + } + toml.put_child ("experimental", experimental_l); + nano::tomlconfig callback_l; callback_l.put ("address", callback_address, "Callback address\ntype:string,ip"); callback_l.put ("port", callback_port, "Callback port number\ntype:uint16"); @@ -200,20 +210,8 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) if (toml.has_key ("work_peers")) { work_peers.clear (); - toml.array_entries_required ("work_peers", [this](std::string const & entry) { - auto port_position (entry.rfind (':')); - bool result = port_position == -1; - if (!result) - { - auto port_str (entry.substr (port_position + 1)); - uint16_t port; - result |= parse_port (port_str, port); - if (!result) - { - auto address (entry.substr (0, port_position)); - this->work_peers.emplace_back (address, port); - } - } + toml.array_entries_required ("work_peers", [this](std::string const & entry_a) { + this->deserialize_address (entry_a, this->work_peers); }); } @@ -335,6 +333,18 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) frontiers_confirmation = deserialize_frontiers_confirmation (frontiers_confirmation_l); } + if (toml.has_key ("experimental")) + { + auto experimental_config_l (toml.get_required_child ("experimental")); + if (experimental_config_l.has_key ("secondary_work_peers")) + { + secondary_work_peers.clear (); + experimental_config_l.array_entries_required ("secondary_work_peers", [this](std::string const & entry_a) { + this->deserialize_address (entry_a, this->secondary_work_peers); + }); + } + } + // Validate ranges if (online_weight_quorum > 100) { @@ -816,6 +826,23 @@ nano::frontiers_confirmation_mode nano::node_config::deserialize_frontiers_confi } } +void nano::node_config::deserialize_address (std::string const & entry_a, std::vector> & container_a) const +{ + auto port_position (entry_a.rfind (':')); + bool result = (port_position == -1); + if (!result) + { + auto port_str (entry_a.substr (port_position + 1)); + uint16_t port; + result |= parse_port (port_str, port); + if (!result) + { + auto address (entry_a.substr (0, port_position)); + container_a.emplace_back (address, port); + } + } +} + nano::account nano::node_config::random_representative () const { assert (!preconfigured_representatives.empty ()); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 9d5e2909..f44c166a 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -45,6 +45,7 @@ public: uint16_t peering_port{ 0 }; nano::logging logging; std::vector> work_peers; + std::vector> secondary_work_peers{ { "127.0.0.1", 8076 } }; /* Default of nano-pow-server */ std::vector preconfigured_peers; std::vector preconfigured_representatives; unsigned bootstrap_fraction_numerator{ 1 }; @@ -93,10 +94,12 @@ public: double max_work_generate_multiplier{ 64. }; uint64_t max_work_generate_difficulty{ nano::network_constants::publish_full_threshold }; nano::rocksdb_config rocksdb_config; - nano::frontiers_confirmation_mode frontiers_confirmation{ nano::frontiers_confirmation_mode::automatic }; std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; nano::frontiers_confirmation_mode deserialize_frontiers_confirmation (std::string const &); + /** Entry is ignored if it cannot be parsed as a valid address:port */ + void deserialize_address (std::string const &, std::vector> &) const; + static unsigned json_version () { return 18;