diff --git a/nano/core_test/request_aggregator.cpp b/nano/core_test/request_aggregator.cpp index a0e7125c..1ca73fa5 100644 --- a/nano/core_test/request_aggregator.cpp +++ b/nano/core_test/request_aggregator.cpp @@ -222,20 +222,34 @@ TEST (request_aggregator, two_endpoints) auto dummy_channel1 = std::make_shared (node1, node1); auto dummy_channel2 = std::make_shared (node2, node2); ASSERT_NE (nano::transport::map_endpoint_to_v6 (dummy_channel1->get_endpoint ()), nano::transport::map_endpoint_to_v6 (dummy_channel2->get_endpoint ())); - // Use the aggregator from node1 only, making requests from both nodes + + // For the first request, aggregator should generate a new vote node1.aggregator.add (dummy_channel1, request); - node1.aggregator.add (dummy_channel2, request); - ASSERT_EQ (2, node1.aggregator.size ()); - // For the first request it generates the vote, for the second it uses the generated vote - ASSERT_TIMELY (3s, node1.aggregator.empty ()); - ASSERT_EQ (2, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); + ASSERT_TIMELY (5s, node1.aggregator.empty ()); + + ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); ASSERT_EQ (0, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); - ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); - ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); - ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); - ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes) + node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_late_hashes)); - ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes) + node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_late_votes)); - ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote)); + + ASSERT_TIMELY_EQ (5s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); + ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); + ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); + ASSERT_TIMELY_EQ (3s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); + ASSERT_TIMELY_EQ (3s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); + ASSERT_TIMELY_EQ (3s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote)); + + // For the second request, aggregator should use the cache + node1.aggregator.add (dummy_channel1, request); + ASSERT_TIMELY (5s, node1.aggregator.empty ()); + + ASSERT_TIMELY_EQ (5s, 2, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted)); + ASSERT_EQ (0, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped)); + + ASSERT_TIMELY_EQ (5s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown)); + ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes)); + ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes)); + ASSERT_TIMELY_EQ (3s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes)); + ASSERT_TIMELY_EQ (3s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes)); + ASSERT_TIMELY_EQ (3s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote)); } TEST (request_aggregator, split) diff --git a/nano/node/request_aggregator.cpp b/nano/node/request_aggregator.cpp index 10c82506..8c9eb38a 100644 --- a/nano/node/request_aggregator.cpp +++ b/nano/node/request_aggregator.cpp @@ -169,18 +169,30 @@ void nano::request_aggregator::erase_duplicates (std::vector>, std::vector>> nano::request_aggregator::aggregate (std::vector> const & requests_a, std::shared_ptr & channel_a) const { auto transaction (ledger.store.tx_begin_read ()); - std::size_t cached_hashes = 0; std::vector> to_generate; std::vector> to_generate_final; std::vector> cached_votes; + std::unordered_set cached_hashes; for (auto const & [hash, root] : requests_a) { + // 0. Hashes already sent + if (cached_hashes.count (hash) > 0) + { + continue; + } + // 1. Votes in cache auto find_votes (local_votes.votes (root, hash)); if (!find_votes.empty ()) { - ++cached_hashes; - cached_votes.insert (cached_votes.end (), find_votes.begin (), find_votes.end ()); + for (auto & found_vote : find_votes) + { + cached_votes.push_back (found_vote); + for (auto & found_hash : found_vote->hashes) + { + cached_hashes.insert (found_hash); + } + } } else { @@ -188,7 +200,7 @@ std::pair>, std::vector block; - //2. Final votes + // 2. Final votes auto final_vote_hashes (ledger.store.final_vote.get (transaction, root)); if (!final_vote_hashes.empty ()) { @@ -295,7 +307,7 @@ std::pair>, std::vectorqualified_root (), hash_a); + debug_assert (block == nullptr || root_a == block->root ()); } else { - auto should_vote (false); - if (is_final) + auto block (ledger.store.block.get (transaction, hash_a)); + should_vote = block != nullptr && ledger.dependents_confirmed (transaction, *block); + } + if (should_vote) + { + nano::unique_lock lock (mutex); + candidates.emplace_back (root_a, hash_a); + if (candidates.size () >= nano::network::confirm_ack_hashes_max) { - auto block (ledger.store.block.get (transaction, hash_a)); - should_vote = block != nullptr && ledger.dependents_confirmed (transaction, *block) && ledger.store.final_vote.put (transaction, block->qualified_root (), hash_a); - debug_assert (block == nullptr || root_a == block->root ()); - } - else - { - auto block (ledger.store.block.get (transaction, hash_a)); - should_vote = block != nullptr && ledger.dependents_confirmed (transaction, *block); - } - if (should_vote) - { - nano::unique_lock lock (mutex); - candidates.emplace_back (root_a, hash_a); - if (candidates.size () >= nano::network::confirm_ack_hashes_max) - { - lock.unlock (); - condition.notify_all (); - } + lock.unlock (); + condition.notify_all (); } } } @@ -294,7 +283,7 @@ void nano::vote_generator::set_reply_action (std::function & lock_a) { debug_assert (lock_a.owns_lock ()); - std::unordered_set> cached_sent; + std::vector hashes; std::vector roots; hashes.reserve (nano::network::confirm_ack_hashes_max); @@ -302,15 +291,7 @@ void nano::vote_generator::broadcast (nano::unique_lock & lock_a) while (!candidates.empty () && hashes.size () < nano::network::confirm_ack_hashes_max) { auto const & [root, hash] = candidates.front (); - auto cached_votes = history.votes (root, hash, is_final); - for (auto const & cached_vote : cached_votes) - { - if (cached_sent.insert (cached_vote).second) - { - broadcast_action (cached_vote); - } - } - if (cached_votes.empty () && std::find (roots.begin (), roots.end (), root) == roots.end ()) + if (std::find (roots.begin (), roots.end (), root) == roots.end ()) { if (spacing.votable (root, hash)) { @@ -338,7 +319,6 @@ void nano::vote_generator::broadcast (nano::unique_lock & lock_a) void nano::vote_generator::reply (nano::unique_lock & lock_a, request_t && request_a) { lock_a.unlock (); - std::unordered_set> cached_sent; auto i (request_a.first.cbegin ()); auto n (request_a.first.cend ()); while (i != n && !stopped) @@ -350,17 +330,7 @@ void nano::vote_generator::reply (nano::unique_lock & lock_a, reque for (; i != n && hashes.size () < nano::network::confirm_ack_hashes_max; ++i) { auto const & [root, hash] = *i; - auto cached_votes = history.votes (root, hash, is_final); - for (auto const & cached_vote : cached_votes) - { - if (cached_sent.insert (cached_vote).second) - { - stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_late_hashes, stat::dir::in, cached_vote->hashes.size ()); - stats.inc (nano::stat::type::requests, nano::stat::detail::requests_cached_late_votes, stat::dir::in); - reply_action (cached_vote, request_a.second); - } - } - if (cached_votes.empty () && std::find (roots.begin (), roots.end (), root) == roots.end ()) + if (std::find (roots.begin (), roots.end (), root) == roots.end ()) { if (spacing.votable (root, hash)) {