From 1a1979f738f0d5c5671e5eaacbb399a76e532249 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Fri, 19 Jun 2020 19:41:31 +0100 Subject: [PATCH] Improve distributed_work_factory item management (#2736) * Improve distributed_work_factory - Using an unordered_multimap container of unordered_map of vectors to simplify methods. - Use ASSERT_TIMELY in tests * Fix intermittent failure in distributed_work.peer_malicious * Fix broken tests on develop * Disable system.generate_send_new Co-authored-by: Wesley Shillingford --- nano/core_test/distributed_work.cpp | 34 ++++------- nano/core_test/ledger.cpp | 2 +- nano/core_test/network.cpp | 2 +- nano/core_test/wallet.cpp | 2 +- nano/node/distributed_work_factory.cpp | 81 ++++++++++++-------------- nano/node/distributed_work_factory.hpp | 19 +++--- nano/rpc_test/rpc.cpp | 2 +- 7 files changed, 62 insertions(+), 80 deletions(-) diff --git a/nano/core_test/distributed_work.cpp b/nano/core_test/distributed_work.cpp index f38dbdf0..4ae0dbe3 100644 --- a/nano/core_test/distributed_work.cpp +++ b/nano/core_test/distributed_work.cpp @@ -29,8 +29,8 @@ TEST (distributed_work, no_peers) ASSERT_TIMELY (5s, done); ASSERT_GE (nano::work_difficulty (nano::work_version::work_1, hash, *work), node->network_params.network.publish_thresholds.base); // should only be removed after cleanup - ASSERT_EQ (1, node->distributed_work.items.size ()); - while (!node->distributed_work.items.empty ()) + ASSERT_EQ (1, node->distributed_work.size ()); + while (node->distributed_work.size () > 0) { node->distributed_work.cleanup_finished (); ASSERT_NO_ERROR (system.poll ()); @@ -59,21 +59,21 @@ TEST (distributed_work, no_peers_cancel) done = true; }; ASSERT_FALSE (node.distributed_work.make (nano::work_version::work_1, hash, node.config.work_peers, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_thresholds.base), callback_to_cancel)); - ASSERT_EQ (1, node.distributed_work.items.size ()); + ASSERT_EQ (1, node.distributed_work.size ()); // cleanup should not cancel or remove an ongoing work node.distributed_work.cleanup_finished (); - ASSERT_EQ (1, node.distributed_work.items.size ()); + ASSERT_EQ (1, node.distributed_work.size ()); // manually cancel - node.distributed_work.cancel (hash, true); // forces local stop - ASSERT_TIMELY (20s, done && node.distributed_work.items.empty ()); + node.distributed_work.cancel (hash); + ASSERT_TIMELY (20s, done && node.distributed_work.size () == 0); // now using observer done = false; ASSERT_FALSE (node.distributed_work.make (nano::work_version::work_1, hash, node.config.work_peers, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_thresholds.base), callback_to_cancel)); - ASSERT_EQ (1, node.distributed_work.items.size ()); + ASSERT_EQ (1, node.distributed_work.size ()); node.observers.work_cancel.notify (hash); - ASSERT_TIMELY (20s, done && node.distributed_work.items.empty ()); + ASSERT_TIMELY (20s, done && node.distributed_work.size () == 0); } TEST (distributed_work, no_peers_multi) @@ -92,16 +92,9 @@ TEST (distributed_work, no_peers_multi) { ASSERT_FALSE (node->distributed_work.make (nano::work_version::work_1, hash, node->config.work_peers, nano::difficulty::from_multiplier (10, node->network_params.network.publish_thresholds.base), callback)); } - // 1 root, and _total_ requests for that root are expected, but some may have already finished - ASSERT_EQ (1, node->distributed_work.items.size ()); - { - auto requests (node->distributed_work.items.begin ()); - ASSERT_EQ (hash, requests->first); - ASSERT_GE (requests->second.size (), total - 4); - } ASSERT_TIMELY (5s, count == total); system.deadline_set (5s); - while (!node->distributed_work.items.empty ()) + while (node->distributed_work.size () > 0) { node->distributed_work.cleanup_finished (); ASSERT_NO_ERROR (system.poll ()); @@ -113,20 +106,13 @@ TEST (distributed_work, no_peers_multi) nano::block_hash hash_i (i + 1); ASSERT_FALSE (node->distributed_work.make (nano::work_version::work_1, hash_i, node->config.work_peers, node->network_params.network.publish_thresholds.base, callback)); } - // 10 roots expected with 1 work each, but some may have completed so test for some - ASSERT_GT (node->distributed_work.items.size (), 5); - for (auto & requests : node->distributed_work.items) - { - ASSERT_EQ (1, requests.second.size ()); - } ASSERT_TIMELY (5s, count == total); system.deadline_set (5s); - while (!node->distributed_work.items.empty ()) + while (node->distributed_work.size () > 0) { node->distributed_work.cleanup_finished (); ASSERT_NO_ERROR (system.poll ()); } - count = 0; } TEST (distributed_work, peer) diff --git a/nano/core_test/ledger.cpp b/nano/core_test/ledger.cpp index afd3eff1..daaac59f 100644 --- a/nano/core_test/ledger.cpp +++ b/nano/core_test/ledger.cpp @@ -573,7 +573,7 @@ TEST (system, DISABLED_generate_send_existing) runner.join (); } -TEST (system, generate_send_new) +TEST (system, DISABLED_generate_send_new) { nano::system system (1); auto & node1 (*system.nodes[0]); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 4c34f693..3b4f97f5 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -155,7 +155,7 @@ TEST (network, last_contacted) // Make sure last_contact gets updated on receiving a non-handshake message auto timestamp_before_keepalive = channel2->get_last_packet_received (); node1->network.send_keepalive (channel1); - ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) == 2); + ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) >= 2); ASSERT_EQ (node0->network.size (), 1); auto timestamp_after_keepalive = channel2->get_last_packet_received (); ASSERT_GT (timestamp_after_keepalive, timestamp_before_keepalive); diff --git a/nano/core_test/wallet.cpp b/nano/core_test/wallet.cpp index f57c2e50..53f83e98 100644 --- a/nano/core_test/wallet.cpp +++ b/nano/core_test/wallet.cpp @@ -1168,7 +1168,7 @@ TEST (work_watcher, generation_disabled) updated_multiplier = existing->multiplier; } ASSERT_EQ (updated_multiplier, multiplier); - ASSERT_TRUE (node.distributed_work.items.empty ()); + ASSERT_EQ (0, node.distributed_work.size ()); } TEST (work_watcher, cancel) diff --git a/nano/node/distributed_work_factory.cpp b/nano/node/distributed_work_factory.cpp index 69703a54..26ece3b7 100644 --- a/nano/node/distributed_work_factory.cpp +++ b/nano/node/distributed_work_factory.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -27,7 +28,7 @@ bool nano::distributed_work_factory::make (std::chrono::seconds const & backoff_ auto distributed (std::make_shared (node, request_a, backoff_a)); { nano::lock_guard guard (mutex); - items[request_a.root].emplace_back (distributed); + items.emplace (request_a.root, distributed); } distributed->start (); error_l = false; @@ -36,43 +37,38 @@ bool nano::distributed_work_factory::make (std::chrono::seconds const & backoff_ return error_l; } -void nano::distributed_work_factory::cancel (nano::root const & root_a, bool const local_stop) +void nano::distributed_work_factory::cancel (nano::root const & root_a) { nano::lock_guard guard_l (mutex); - auto existing_l (items.find (root_a)); - if (existing_l != items.end ()) - { - for (auto & distributed_w : existing_l->second) + auto root_items_l = items.equal_range (root_a); + std::for_each (root_items_l.first, root_items_l.second, [](auto item_l) { + if (auto distributed_l = item_l.second.lock ()) { - if (auto distributed_l = distributed_w.lock ()) - { - // Send work_cancel to work peers and stop local work generation - distributed_l->cancel (); - } + // Send work_cancel to work peers and stop local work generation + distributed_l->cancel (); } - items.erase (existing_l); - } + }); + items.erase (root_items_l.first, root_items_l.second); } void nano::distributed_work_factory::cleanup_finished () { nano::lock_guard guard (mutex); - for (auto it (items.begin ()), end (items.end ()); it != end;) - { - it->second.erase (std::remove_if (it->second.begin (), it->second.end (), [](auto distributed_a) { - return distributed_a.expired (); - }), - it->second.end ()); - - if (it->second.empty ()) + // std::erase_if in c++20 + auto erase_if = [](decltype (items) & container, auto pred) { + for (auto it = container.begin (), end = container.end (); it != end;) { - it = items.erase (it); + if (pred (*it)) + { + it = container.erase (it); + } + else + { + ++it; + } } - else - { - ++it; - } - } + }; + erase_if (items, [](decltype (items)::value_type item) { return item.second.expired (); }); } void nano::distributed_work_factory::stop () @@ -80,31 +76,28 @@ void nano::distributed_work_factory::stop () if (!stopped.exchange (true)) { // Cancel any ongoing work - std::unordered_set roots_l; - nano::unique_lock lock_l (mutex); - for (auto const & item_l : items) + nano::lock_guard guard (mutex); + for (auto & item_l : items) { - roots_l.insert (item_l.first); + if (auto distributed_l = item_l.second.lock ()) + { + distributed_l->cancel (); + } } - lock_l.unlock (); - for (auto const & root_l : roots_l) - { - cancel (root_l, true); - } - lock_l.lock (); items.clear (); } } +size_t nano::distributed_work_factory::size () const +{ + nano::lock_guard guard_l (mutex); + return items.size (); +} + std::unique_ptr nano::collect_container_info (distributed_work_factory & distributed_work, const std::string & name) { - size_t item_count; - { - nano::lock_guard guard (distributed_work.mutex); - item_count = distributed_work.items.size (); - } - - auto sizeof_item_element = sizeof (decltype (distributed_work.items)::value_type); + auto item_count = distributed_work.size (); + auto sizeof_item_element = sizeof (decltype (nano::distributed_work_factory::items)::value_type); auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "items", item_count, sizeof_item_element })); return composite; diff --git a/nano/node/distributed_work_factory.hpp b/nano/node/distributed_work_factory.hpp index 350b1735..4e5a7bd5 100644 --- a/nano/node/distributed_work_factory.hpp +++ b/nano/node/distributed_work_factory.hpp @@ -1,9 +1,6 @@ #pragma once #include -#include - -#include #include #include @@ -13,9 +10,11 @@ namespace nano { -class node; +class container_info_component; class distributed_work; +class node; class root; +struct work_request; class distributed_work_factory final { @@ -24,16 +23,20 @@ public: ~distributed_work_factory (); bool make (nano::work_version const, nano::root const &, std::vector> const &, uint64_t, std::function)> const &, boost::optional const & = boost::none); bool make (std::chrono::seconds const &, nano::work_request const &); - void cancel (nano::root const &, bool const local_stop = false); + void cancel (nano::root const &); void cleanup_finished (); void stop (); + size_t size () const; + +private: + std::unordered_multimap> items; nano::node & node; - std::unordered_map>> items; - std::mutex mutex; + mutable std::mutex mutex; std::atomic stopped{ false }; + + friend std::unique_ptr collect_container_info (distributed_work_factory &, const std::string &); }; -class container_info_component; std::unique_ptr collect_container_info (distributed_work_factory & distributed_work, const std::string & name); } diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 083f0560..aa6e9ee8 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -5961,7 +5961,7 @@ TEST (rpc, online_reps) scoped_thread_name_io.renew (); ASSERT_NE (nullptr, change); ASSERT_TIMELY (5s, node2->block (change->hash ())); - ASSERT_TIMELY (5s, node2->online_reps.list ().size () != 2); + ASSERT_TIMELY (5s, node2->online_reps.list ().size () == 2); boost::property_tree::ptree child_rep; child_rep.put ("", new_rep.to_account ()); boost::property_tree::ptree filtered_accounts;