From 87a04ad409f7752559cfea72640eabd53746e0c1 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Wed, 18 Mar 2020 20:35:16 +0000 Subject: [PATCH] Move all request aggregator operations out of the mutex hold scope (#2662) * Move all request aggregator operations out of the mutex hold scope * Accidentally removed test assert (Wes review) --- nano/core_test/request_aggregator.cpp | 132 ++++++++------------------ nano/node/request_aggregator.cpp | 53 ++++++----- nano/node/request_aggregator.hpp | 10 +- 3 files changed, 74 insertions(+), 121 deletions(-) diff --git a/nano/core_test/request_aggregator.cpp b/nano/core_test/request_aggregator.cpp index 8043c6d2..29a47340 100644 --- a/nano/core_test/request_aggregator.cpp +++ b/nano/core_test/request_aggregator.cpp @@ -21,38 +21,25 @@ TEST (request_aggregator, one) auto channel (node.network.udp_channels.create (node.network.endpoint ())); node.aggregator.add (channel, request); ASSERT_EQ (1, node.aggregator.size ()); - system.deadline_set (3s); - while (!node.aggregator.empty ()) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, node.aggregator.empty ()); // Not yet in the ledger - ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); node.aggregator.add (channel, request); ASSERT_EQ (1, node.aggregator.size ()); // In the ledger but no vote generated yet - // Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough - system.deadline_set (3s); - while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); ASSERT_TRUE (node.aggregator.empty ()); node.aggregator.add (channel, request); ASSERT_EQ (1, node.aggregator.size ()); // Already cached - system.deadline_set (3s); - while (!node.aggregator.empty ()) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, node.aggregator.empty ()); ASSERT_EQ (3, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); - ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); - ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); - ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); - ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); + ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } TEST (request_aggregator, one_update) @@ -77,21 +64,16 @@ TEST (request_aggregator, one_update) node.aggregator.add (channel, request); ASSERT_EQ (1, node.aggregator.size ()); // In the ledger but no vote generated yet - // Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough - system.deadline_set (3s); - while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)) ASSERT_TRUE (node.aggregator.empty ()); ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); - ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); - ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); - ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); - ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); - ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); - ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 1); + ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); + ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); + ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); + ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } TEST (request_aggregator, two) @@ -114,29 +96,20 @@ TEST (request_aggregator, two) node.aggregator.add (channel, request); ASSERT_EQ (1, node.aggregator.size ()); // One vote should be generated for both blocks - // Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough - system.deadline_set (3s); - while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); ASSERT_TRUE (node.aggregator.empty ()); // The same request should now send the cached vote node.aggregator.add (channel, request); ASSERT_EQ (1, node.aggregator.size ()); - system.deadline_set (3s); - while (!node.aggregator.empty ()) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, node.aggregator.empty ()); ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); - ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); - ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); - ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); - ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); - ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); - ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2); + ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); + ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); + ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); + ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // Make sure the cached vote is for both hashes auto vote1 (node.votes_cache.find (send1->hash ())); auto vote2 (node.votes_cache.find (send2->hash ())); @@ -168,19 +141,15 @@ TEST (request_aggregator, two_endpoints) node1.aggregator.add (channel1, request); node1.aggregator.add (channel2, request); ASSERT_EQ (2, node1.aggregator.size ()); - system.deadline_set (3s); // For the first request it generates the vote, for the second it uses the generated vote - while (!node1.aggregator.empty ()) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, node1.aggregator.empty ()); ASSERT_EQ (2, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); ASSERT_EQ (0, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); - ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); - ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); - ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); - ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); - ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); + ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); + ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); + ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); + ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); + ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); } TEST (request_aggregator, split) @@ -217,21 +186,16 @@ TEST (request_aggregator, split) node.aggregator.add (channel, request); ASSERT_EQ (1, node.aggregator.size ()); // In the ledger but no vote generated yet - // Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough - system.deadline_set (3s); - while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) < 2) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); ASSERT_TRUE (node.aggregator.empty ()); // Two votes were sent, the first one for 12 hashes and the second one for 1 hash ASSERT_EQ (1, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); - ASSERT_EQ (13, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); - ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); - ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); - ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); - ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2); + ASSERT_TIMELY (3s, 13 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); + ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); + ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); + ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); + ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } TEST (request_aggregator, channel_lifetime) @@ -252,11 +216,7 @@ TEST (request_aggregator, channel_lifetime) node.aggregator.add (channel, request); } ASSERT_EQ (1, node.aggregator.size ()); - system.deadline_set (3s); - while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); } TEST (request_aggregator, channel_update) @@ -284,11 +244,7 @@ TEST (request_aggregator, channel_update) ASSERT_EQ (1, node.aggregator.size ()); // channel1 is not being held anymore ASSERT_EQ (nullptr, channel1_w.lock ()); - system.deadline_set (3s); - while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0); } TEST (request_aggregator, channel_max_queue) @@ -307,11 +263,7 @@ TEST (request_aggregator, channel_max_queue) auto channel (node.network.udp_channels.create (node.network.endpoint ())); node.aggregator.add (channel, request); node.aggregator.add (channel, request); - system.deadline_set (3s); - while (node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped) < 1) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); } TEST (request_aggregator, unique) @@ -331,10 +283,6 @@ TEST (request_aggregator, unique) node.aggregator.add (channel, request); node.aggregator.add (channel, request); node.aggregator.add (channel, request); - system.deadline_set (3s); - while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) < 1) - { - ASSERT_NO_ERROR (system.poll ()); - } - ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); + ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); } diff --git a/nano/node/request_aggregator.cpp b/nano/node/request_aggregator.cpp index 9bbd3cdb..17daeff3 100644 --- a/nano/node/request_aggregator.cpp +++ b/nano/node/request_aggregator.cpp @@ -75,25 +75,22 @@ void nano::request_aggregator::run () auto front (requests_by_deadline.begin ()); if (front->deadline < std::chrono::steady_clock::now ()) { - auto pool (*front); - auto transaction (store.tx_begin_read ()); - // Aggregate the current pool of requests, sending cached votes - // Store a local copy of the remaining hashes and the channel - auto remaining = aggregate (transaction, pool); - auto channel = pool.channel; - // Safely erase this pool + // Store the channel and requests for processing after erasing this pool + decltype (front->channel) channel{}; + decltype (front->hashes_roots) hashes_roots{}; + requests_by_deadline.modify (front, [&channel, &hashes_roots](channel_pool & pool) { + channel.swap (pool.channel); + hashes_roots.swap (pool.hashes_roots); + }); requests_by_deadline.erase (front); lock.unlock (); - // Send cached votes - for (auto const & vote : remaining.first) - { - nano::confirm_ack confirm (vote); - channel->send (confirm); - } - if (!remaining.second.empty ()) + erase_duplicates (hashes_roots); + auto transaction (store.tx_begin_read ()); + auto remaining = aggregate (transaction, hashes_roots, channel); + if (!remaining.empty ()) { // Generate votes for the remaining hashes - generate (transaction, std::move (remaining.second), channel); + generate (transaction, remaining, channel); } lock.lock (); } @@ -134,22 +131,23 @@ bool nano::request_aggregator::empty () return size () == 0; } -std::pair>, std::vector> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, channel_pool & pool_a) const +void nano::request_aggregator::erase_duplicates (std::vector> & requests_a) const { - // Unique hashes - using pair = decltype (pool_a.hashes_roots)::value_type; - std::sort (pool_a.hashes_roots.begin (), pool_a.hashes_roots.end (), [](pair const & pair1, pair const & pair2) { + std::sort (requests_a.begin (), requests_a.end (), [](auto const & pair1, auto const & pair2) { return pair1.first < pair2.first; }); - pool_a.hashes_roots.erase (std::unique (pool_a.hashes_roots.begin (), pool_a.hashes_roots.end (), [](pair const & pair1, pair const & pair2) { + requests_a.erase (std::unique (requests_a.begin (), requests_a.end (), [](auto const & pair1, auto const & pair2) { return pair1.first == pair2.first; }), - pool_a.hashes_roots.end ()); + requests_a.end ()); +} +std::vector nano::request_aggregator::aggregate (nano::transaction const & transaction_a, std::vector> const & requests_a, std::shared_ptr & channel_a) const +{ size_t cached_hashes = 0; std::vector to_generate; std::vector> cached_votes; - for (auto const & hash_root : pool_a.hashes_roots) + for (auto const & hash_root : requests_a) { auto find_votes (votes_cache.find (hash_root.first)); if (!find_votes.empty ()) @@ -189,7 +187,7 @@ std::pair>, std::vectorsend (publish); + channel_a->send (publish); } else { @@ -200,12 +198,17 @@ std::pair>, std::vectorsend (confirm); + } stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes, stat::dir::in, cached_hashes); stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_votes, stat::dir::in, cached_votes.size ()); - return { cached_votes, to_generate }; + return to_generate; } -void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector hashes_a, std::shared_ptr & channel_a) const +void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector const & hashes_a, std::shared_ptr & channel_a) const { size_t generated_l = 0; auto i (hashes_a.begin ()); diff --git a/nano/node/request_aggregator.hpp b/nano/node/request_aggregator.hpp index a94cd875..b0988aa9 100644 --- a/nano/node/request_aggregator.hpp +++ b/nano/node/request_aggregator.hpp @@ -73,10 +73,12 @@ public: private: void run (); - /** Aggregate and send cached votes for \p pool_a, returning the leftovers that were not found in cached votes **/ - std::pair>, std::vector> aggregate (nano::transaction const &, channel_pool & pool_a) const; - /** Generate and send votes from \p hashes_a to \p channel_a, does not need a lock on the mutex **/ - void generate (nano::transaction const &, std::vector hashes_a, std::shared_ptr & channel_a) const; + /** Remove duplicate requests **/ + void erase_duplicates (std::vector> &) const; + /** Aggregate \p requests_a and send cached votes to \p channel_a . Return the remaining hashes that need vote generation **/ + std::vector aggregate (nano::transaction const &, std::vector> const & requests_a, std::shared_ptr & channel_a) const; + /** Generate votes from \p hashes_a and send to \p channel_a **/ + void generate (nano::transaction const &, std::vector const & hashes_a, std::shared_ptr & channel_a) const; nano::stat & stats; nano::votes_cache & votes_cache;