Move all request aggregator operations out of the mutex hold scope (#2662)

* Move all request aggregator operations out of the mutex hold scope

* Accidentally removed test assert (Wes review)
This commit is contained in:
Guilherme Lawless 2020-03-18 20:35:16 +00:00 committed by GitHub
commit 87a04ad409
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 74 additions and 121 deletions

View file

@ -21,38 +21,25 @@ TEST (request_aggregator, one)
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
system.deadline_set (3s);
while (!node.aggregator.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, node.aggregator.empty ());
// Not yet in the ledger
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TRUE (node.aggregator.empty ());
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// Already cached
system.deadline_set (3s);
while (!node.aggregator.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, node.aggregator.empty ());
ASSERT_EQ (3, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2);
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
TEST (request_aggregator, one_update)
@ -77,21 +64,16 @@ TEST (request_aggregator, one_update)
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes))
ASSERT_TRUE (node.aggregator.empty ());
ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 1);
ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
TEST (request_aggregator, two)
@ -114,29 +96,20 @@ TEST (request_aggregator, two)
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// One vote should be generated for both blocks
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TRUE (node.aggregator.empty ());
// The same request should now send the cached vote
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
system.deadline_set (3s);
while (!node.aggregator.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, node.aggregator.empty ());
ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2);
ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Make sure the cached vote is for both hashes
auto vote1 (node.votes_cache.find (send1->hash ()));
auto vote2 (node.votes_cache.find (send2->hash ()));
@ -168,19 +141,15 @@ TEST (request_aggregator, two_endpoints)
node1.aggregator.add (channel1, request);
node1.aggregator.add (channel2, request);
ASSERT_EQ (2, node1.aggregator.size ());
system.deadline_set (3s);
// For the first request it generates the vote, for the second it uses the generated vote
while (!node1.aggregator.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, node1.aggregator.empty ());
ASSERT_EQ (2, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
}
TEST (request_aggregator, split)
@ -217,21 +186,16 @@ TEST (request_aggregator, split)
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TRUE (node.aggregator.empty ());
// Two votes were sent, the first one for 12 hashes and the second one for 1 hash
ASSERT_EQ (1, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (13, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2);
ASSERT_TIMELY (3s, 13 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
TEST (request_aggregator, channel_lifetime)
@ -252,11 +216,7 @@ TEST (request_aggregator, channel_lifetime)
node.aggregator.add (channel, request);
}
ASSERT_EQ (1, node.aggregator.size ());
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
}
TEST (request_aggregator, channel_update)
@ -284,11 +244,7 @@ TEST (request_aggregator, channel_update)
ASSERT_EQ (1, node.aggregator.size ());
// channel1 is not being held anymore
ASSERT_EQ (nullptr, channel1_w.lock ());
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0);
}
TEST (request_aggregator, channel_max_queue)
@ -307,11 +263,7 @@ TEST (request_aggregator, channel_max_queue)
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
}
TEST (request_aggregator, unique)
@ -331,10 +283,6 @@ TEST (request_aggregator, unique)
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
}

View file

@ -75,25 +75,22 @@ void nano::request_aggregator::run ()
auto front (requests_by_deadline.begin ());
if (front->deadline < std::chrono::steady_clock::now ())
{
auto pool (*front);
auto transaction (store.tx_begin_read ());
// Aggregate the current pool of requests, sending cached votes
// Store a local copy of the remaining hashes and the channel
auto remaining = aggregate (transaction, pool);
auto channel = pool.channel;
// Safely erase this pool
// Store the channel and requests for processing after erasing this pool
decltype (front->channel) channel{};
decltype (front->hashes_roots) hashes_roots{};
requests_by_deadline.modify (front, [&channel, &hashes_roots](channel_pool & pool) {
channel.swap (pool.channel);
hashes_roots.swap (pool.hashes_roots);
});
requests_by_deadline.erase (front);
lock.unlock ();
// Send cached votes
for (auto const & vote : remaining.first)
{
nano::confirm_ack confirm (vote);
channel->send (confirm);
}
if (!remaining.second.empty ())
erase_duplicates (hashes_roots);
auto transaction (store.tx_begin_read ());
auto remaining = aggregate (transaction, hashes_roots, channel);
if (!remaining.empty ())
{
// Generate votes for the remaining hashes
generate (transaction, std::move (remaining.second), channel);
generate (transaction, remaining, channel);
}
lock.lock ();
}
@ -134,22 +131,23 @@ bool nano::request_aggregator::empty ()
return size () == 0;
}
std::pair<std::vector<std::shared_ptr<nano::vote>>, std::vector<nano::block_hash>> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, channel_pool & pool_a) const
void nano::request_aggregator::erase_duplicates (std::vector<std::pair<nano::block_hash, nano::root>> & requests_a) const
{
// Unique hashes
using pair = decltype (pool_a.hashes_roots)::value_type;
std::sort (pool_a.hashes_roots.begin (), pool_a.hashes_roots.end (), [](pair const & pair1, pair const & pair2) {
std::sort (requests_a.begin (), requests_a.end (), [](auto const & pair1, auto const & pair2) {
return pair1.first < pair2.first;
});
pool_a.hashes_roots.erase (std::unique (pool_a.hashes_roots.begin (), pool_a.hashes_roots.end (), [](pair const & pair1, pair const & pair2) {
requests_a.erase (std::unique (requests_a.begin (), requests_a.end (), [](auto const & pair1, auto const & pair2) {
return pair1.first == pair2.first;
}),
pool_a.hashes_roots.end ());
requests_a.end ());
}
std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
size_t cached_hashes = 0;
std::vector<nano::block_hash> to_generate;
std::vector<std::shared_ptr<nano::vote>> cached_votes;
for (auto const & hash_root : pool_a.hashes_roots)
for (auto const & hash_root : requests_a)
{
auto find_votes (votes_cache.find (hash_root.first));
if (!find_votes.empty ())
@ -189,7 +187,7 @@ std::pair<std::vector<std::shared_ptr<nano::vote>>, std::vector<nano::block_hash
auto successor_block (store.block_get (transaction_a, successor));
debug_assert (successor_block != nullptr);
nano::publish publish (successor_block);
pool_a.channel->send (publish);
channel_a->send (publish);
}
else
{
@ -200,12 +198,17 @@ std::pair<std::vector<std::shared_ptr<nano::vote>>, std::vector<nano::block_hash
// Unique votes
std::sort (cached_votes.begin (), cached_votes.end ());
cached_votes.erase (std::unique (cached_votes.begin (), cached_votes.end ()), cached_votes.end ());
for (auto const & vote : cached_votes)
{
nano::confirm_ack confirm (vote);
channel_a->send (confirm);
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes, stat::dir::in, cached_hashes);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_votes, stat::dir::in, cached_votes.size ());
return { cached_votes, to_generate };
return to_generate;
}
void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<nano::block_hash> hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<nano::block_hash> const & hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
size_t generated_l = 0;
auto i (hashes_a.begin ());

View file

@ -73,10 +73,12 @@ public:
private:
void run ();
/** Aggregate and send cached votes for \p pool_a, returning the leftovers that were not found in cached votes **/
std::pair<std::vector<std::shared_ptr<nano::vote>>, std::vector<nano::block_hash>> aggregate (nano::transaction const &, channel_pool & pool_a) const;
/** Generate and send votes from \p hashes_a to \p channel_a, does not need a lock on the mutex **/
void generate (nano::transaction const &, std::vector<nano::block_hash> hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
/** Remove duplicate requests **/
void erase_duplicates (std::vector<std::pair<nano::block_hash, nano::root>> &) const;
/** Aggregate \p requests_a and send cached votes to \p channel_a . Return the remaining hashes that need vote generation **/
std::vector<nano::block_hash> aggregate (nano::transaction const &, std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
/** Generate votes from \p hashes_a and send to \p channel_a **/
void generate (nano::transaction const &, std::vector<nano::block_hash> const & hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
nano::stat & stats;
nano::votes_cache & votes_cache;