Improve distributed_work_factory item management (#2736)

* Improve distributed_work_factory

- Using an unordered_multimap container of unordered_map of vectors to simplify methods.
- Use ASSERT_TIMELY in tests

* Fix intermittent failure in distributed_work.peer_malicious

* Fix broken tests on develop

* Disable system.generate_send_new

Co-authored-by: Wesley Shillingford <wezrule@hotmail.com>
This commit is contained in:
Guilherme Lawless 2020-06-19 19:41:31 +01:00 committed by GitHub
commit 1a1979f738
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 62 additions and 80 deletions

View file

@ -29,8 +29,8 @@ TEST (distributed_work, no_peers)
ASSERT_TIMELY (5s, done);
ASSERT_GE (nano::work_difficulty (nano::work_version::work_1, hash, *work), node->network_params.network.publish_thresholds.base);
// should only be removed after cleanup
ASSERT_EQ (1, node->distributed_work.items.size ());
while (!node->distributed_work.items.empty ())
ASSERT_EQ (1, node->distributed_work.size ());
while (node->distributed_work.size () > 0)
{
node->distributed_work.cleanup_finished ();
ASSERT_NO_ERROR (system.poll ());
@ -59,21 +59,21 @@ TEST (distributed_work, no_peers_cancel)
done = true;
};
ASSERT_FALSE (node.distributed_work.make (nano::work_version::work_1, hash, node.config.work_peers, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_thresholds.base), callback_to_cancel));
ASSERT_EQ (1, node.distributed_work.items.size ());
ASSERT_EQ (1, node.distributed_work.size ());
// cleanup should not cancel or remove an ongoing work
node.distributed_work.cleanup_finished ();
ASSERT_EQ (1, node.distributed_work.items.size ());
ASSERT_EQ (1, node.distributed_work.size ());
// manually cancel
node.distributed_work.cancel (hash, true); // forces local stop
ASSERT_TIMELY (20s, done && node.distributed_work.items.empty ());
node.distributed_work.cancel (hash);
ASSERT_TIMELY (20s, done && node.distributed_work.size () == 0);
// now using observer
done = false;
ASSERT_FALSE (node.distributed_work.make (nano::work_version::work_1, hash, node.config.work_peers, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_thresholds.base), callback_to_cancel));
ASSERT_EQ (1, node.distributed_work.items.size ());
ASSERT_EQ (1, node.distributed_work.size ());
node.observers.work_cancel.notify (hash);
ASSERT_TIMELY (20s, done && node.distributed_work.items.empty ());
ASSERT_TIMELY (20s, done && node.distributed_work.size () == 0);
}
TEST (distributed_work, no_peers_multi)
@ -92,16 +92,9 @@ TEST (distributed_work, no_peers_multi)
{
ASSERT_FALSE (node->distributed_work.make (nano::work_version::work_1, hash, node->config.work_peers, nano::difficulty::from_multiplier (10, node->network_params.network.publish_thresholds.base), callback));
}
// 1 root, and _total_ requests for that root are expected, but some may have already finished
ASSERT_EQ (1, node->distributed_work.items.size ());
{
auto requests (node->distributed_work.items.begin ());
ASSERT_EQ (hash, requests->first);
ASSERT_GE (requests->second.size (), total - 4);
}
ASSERT_TIMELY (5s, count == total);
system.deadline_set (5s);
while (!node->distributed_work.items.empty ())
while (node->distributed_work.size () > 0)
{
node->distributed_work.cleanup_finished ();
ASSERT_NO_ERROR (system.poll ());
@ -113,20 +106,13 @@ TEST (distributed_work, no_peers_multi)
nano::block_hash hash_i (i + 1);
ASSERT_FALSE (node->distributed_work.make (nano::work_version::work_1, hash_i, node->config.work_peers, node->network_params.network.publish_thresholds.base, callback));
}
// 10 roots expected with 1 work each, but some may have completed so test for some
ASSERT_GT (node->distributed_work.items.size (), 5);
for (auto & requests : node->distributed_work.items)
{
ASSERT_EQ (1, requests.second.size ());
}
ASSERT_TIMELY (5s, count == total);
system.deadline_set (5s);
while (!node->distributed_work.items.empty ())
while (node->distributed_work.size () > 0)
{
node->distributed_work.cleanup_finished ();
ASSERT_NO_ERROR (system.poll ());
}
count = 0;
}
TEST (distributed_work, peer)

View file

@ -573,7 +573,7 @@ TEST (system, DISABLED_generate_send_existing)
runner.join ();
}
TEST (system, generate_send_new)
TEST (system, DISABLED_generate_send_new)
{
nano::system system (1);
auto & node1 (*system.nodes[0]);

View file

@ -155,7 +155,7 @@ TEST (network, last_contacted)
// Make sure last_contact gets updated on receiving a non-handshake message
auto timestamp_before_keepalive = channel2->get_last_packet_received ();
node1->network.send_keepalive (channel1);
ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) == 2);
ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) >= 2);
ASSERT_EQ (node0->network.size (), 1);
auto timestamp_after_keepalive = channel2->get_last_packet_received ();
ASSERT_GT (timestamp_after_keepalive, timestamp_before_keepalive);

View file

@ -1168,7 +1168,7 @@ TEST (work_watcher, generation_disabled)
updated_multiplier = existing->multiplier;
}
ASSERT_EQ (updated_multiplier, multiplier);
ASSERT_TRUE (node.distributed_work.items.empty ());
ASSERT_EQ (0, node.distributed_work.size ());
}
TEST (work_watcher, cancel)

View file

@ -1,3 +1,4 @@
#include <nano/node/distributed_work.hpp>
#include <nano/node/distributed_work_factory.hpp>
#include <nano/node/node.hpp>
@ -27,7 +28,7 @@ bool nano::distributed_work_factory::make (std::chrono::seconds const & backoff_
auto distributed (std::make_shared<nano::distributed_work> (node, request_a, backoff_a));
{
nano::lock_guard<std::mutex> guard (mutex);
items[request_a.root].emplace_back (distributed);
items.emplace (request_a.root, distributed);
}
distributed->start ();
error_l = false;
@ -36,43 +37,38 @@ bool nano::distributed_work_factory::make (std::chrono::seconds const & backoff_
return error_l;
}
void nano::distributed_work_factory::cancel (nano::root const & root_a, bool const local_stop)
void nano::distributed_work_factory::cancel (nano::root const & root_a)
{
nano::lock_guard<std::mutex> guard_l (mutex);
auto existing_l (items.find (root_a));
if (existing_l != items.end ())
{
for (auto & distributed_w : existing_l->second)
auto root_items_l = items.equal_range (root_a);
std::for_each (root_items_l.first, root_items_l.second, [](auto item_l) {
if (auto distributed_l = item_l.second.lock ())
{
if (auto distributed_l = distributed_w.lock ())
{
// Send work_cancel to work peers and stop local work generation
distributed_l->cancel ();
}
// Send work_cancel to work peers and stop local work generation
distributed_l->cancel ();
}
items.erase (existing_l);
}
});
items.erase (root_items_l.first, root_items_l.second);
}
void nano::distributed_work_factory::cleanup_finished ()
{
nano::lock_guard<std::mutex> guard (mutex);
for (auto it (items.begin ()), end (items.end ()); it != end;)
{
it->second.erase (std::remove_if (it->second.begin (), it->second.end (), [](auto distributed_a) {
return distributed_a.expired ();
}),
it->second.end ());
if (it->second.empty ())
// std::erase_if in c++20
auto erase_if = [](decltype (items) & container, auto pred) {
for (auto it = container.begin (), end = container.end (); it != end;)
{
it = items.erase (it);
if (pred (*it))
{
it = container.erase (it);
}
else
{
++it;
}
}
else
{
++it;
}
}
};
erase_if (items, [](decltype (items)::value_type item) { return item.second.expired (); });
}
void nano::distributed_work_factory::stop ()
@ -80,31 +76,28 @@ void nano::distributed_work_factory::stop ()
if (!stopped.exchange (true))
{
// Cancel any ongoing work
std::unordered_set<nano::root> roots_l;
nano::unique_lock<std::mutex> lock_l (mutex);
for (auto const & item_l : items)
nano::lock_guard<std::mutex> guard (mutex);
for (auto & item_l : items)
{
roots_l.insert (item_l.first);
if (auto distributed_l = item_l.second.lock ())
{
distributed_l->cancel ();
}
}
lock_l.unlock ();
for (auto const & root_l : roots_l)
{
cancel (root_l, true);
}
lock_l.lock ();
items.clear ();
}
}
size_t nano::distributed_work_factory::size () const
{
nano::lock_guard<std::mutex> guard_l (mutex);
return items.size ();
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (distributed_work_factory & distributed_work, const std::string & name)
{
size_t item_count;
{
nano::lock_guard<std::mutex> guard (distributed_work.mutex);
item_count = distributed_work.items.size ();
}
auto sizeof_item_element = sizeof (decltype (distributed_work.items)::value_type);
auto item_count = distributed_work.size ();
auto sizeof_item_element = sizeof (decltype (nano::distributed_work_factory::items)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "items", item_count, sizeof_item_element }));
return composite;

View file

@ -1,9 +1,6 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <nano/node/distributed_work.hpp>
#include <boost/optional/optional.hpp>
#include <atomic>
#include <functional>
@ -13,9 +10,11 @@
namespace nano
{
class node;
class container_info_component;
class distributed_work;
class node;
class root;
struct work_request;
class distributed_work_factory final
{
@ -24,16 +23,20 @@ public:
~distributed_work_factory ();
bool make (nano::work_version const, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, uint64_t, std::function<void(boost::optional<uint64_t>)> const &, boost::optional<nano::account> const & = boost::none);
bool make (std::chrono::seconds const &, nano::work_request const &);
void cancel (nano::root const &, bool const local_stop = false);
void cancel (nano::root const &);
void cleanup_finished ();
void stop ();
size_t size () const;
private:
std::unordered_multimap<nano::root, std::weak_ptr<nano::distributed_work>> items;
nano::node & node;
std::unordered_map<nano::root, std::vector<std::weak_ptr<nano::distributed_work>>> items;
std::mutex mutex;
mutable std::mutex mutex;
std::atomic<bool> stopped{ false };
friend std::unique_ptr<container_info_component> collect_container_info (distributed_work_factory &, const std::string &);
};
class container_info_component;
std::unique_ptr<container_info_component> collect_container_info (distributed_work_factory & distributed_work, const std::string & name);
}

View file

@ -5961,7 +5961,7 @@ TEST (rpc, online_reps)
scoped_thread_name_io.renew ();
ASSERT_NE (nullptr, change);
ASSERT_TIMELY (5s, node2->block (change->hash ()));
ASSERT_TIMELY (5s, node2->online_reps.list ().size () != 2);
ASSERT_TIMELY (5s, node2->online_reps.list ().size () == 2);
boost::property_tree::ptree child_rep;
child_rep.put ("", new_rep.to_account ());
boost::property_tree::ptree filtered_accounts;