Adjust request aggregator stats (#2507)

* Adjust request aggregator stats

And ensure unique hashes before generating votes

* Move hash duplicate removal to before aggregation, and add a test

* Change stats to be more obvious without reading documentation

* Fix a rare failure in a test due to rep crawler sending an extra request
This commit is contained in:
Guilherme Lawless 2020-01-25 13:24:25 +00:00 committed by GitHub
commit d1f04a3df7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 121 additions and 67 deletions

View file

@ -26,15 +26,15 @@ TEST (request_aggregator, one)
{
ASSERT_NO_ERROR (system.poll ());
}
// Not yet in the ledger, should be ignored
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
// Not yet in the ledger
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
@ -47,11 +47,11 @@ TEST (request_aggregator, one)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (3, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (3, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
@ -79,16 +79,18 @@ TEST (request_aggregator, one_update)
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
@ -114,12 +116,11 @@ TEST (request_aggregator, two)
// One vote should be generated for both blocks
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
// The same request should now send the cached vote
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
@ -128,11 +129,13 @@ TEST (request_aggregator, two)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Make sure the cached vote is for both hashes
auto vote1 (node.votes_cache.find (send1->hash ()));
@ -147,9 +150,11 @@ TEST (request_aggregator, two_endpoints)
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node1 (*system.add_node (node_config));
nano::node_flags node_flags;
node_flags.disable_rep_crawler = true;
auto & node1 (*system.add_node (node_config, node_flags));
node_config.peering_port = nano::get_available_port ();
auto & node2 (*system.add_node (node_config));
auto & node2 (*system.add_node (node_config, node_flags));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1.work_generate_blocking (genesis.hash ())));
@ -169,11 +174,13 @@ TEST (request_aggregator, two_endpoints)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (2, node1.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (2, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
}
TEST (request_aggregator, split)
@ -212,26 +219,19 @@ TEST (request_aggregator, split)
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) < 2)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
// Two votes were sent, the first one for 12 hashes and the second one for 1 hash
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (1, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
ASSERT_EQ (13, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
auto transaction (node.store.tx_begin_read ());
auto pre_last_hash (node.store.block_get (transaction, previous)->previous ());
auto vote1 (node.votes_cache.find (pre_last_hash));
ASSERT_EQ (1, vote1.size ());
ASSERT_EQ (max_vbh, vote1[0]->blocks.size ());
auto vote2 (node.votes_cache.find (previous));
ASSERT_EQ (1, vote2.size ());
ASSERT_EQ (1, vote2[0]->blocks.size ());
}
TEST (request_aggregator, channel_lifetime)
@ -253,7 +253,7 @@ TEST (request_aggregator, channel_lifetime)
}
ASSERT_EQ (1, node.aggregator.size ());
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
@ -285,7 +285,7 @@ TEST (request_aggregator, channel_update)
// channel1 is not being held anymore
ASSERT_EQ (nullptr, channel1_w.lock ());
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
@ -308,8 +308,33 @@ TEST (request_aggregator, channel_max_queue)
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped) < 1)
while (node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
}
}
TEST (request_aggregator, unique)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ())));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
}

View file

@ -438,6 +438,9 @@ std::string nano::stat::type_to_string (uint32_t key)
case nano::stat::type::drop:
res = "drop";
break;
case nano::stat::type::aggregator:
res = "aggregator";
break;
case nano::stat::type::requests:
res = "requests";
break;
@ -655,17 +658,26 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::blocks_confirmed:
res = "blocks_confirmed";
break;
case nano::stat::detail::requests_cached:
res = "requests_votes_cached";
case nano::stat::detail::aggregator_accepted:
res = "aggregator_accepted";
break;
case nano::stat::detail::requests_generated:
res = "requests_votes_generated";
case nano::stat::detail::aggregator_dropped:
res = "aggregator_dropped";
break;
case nano::stat::detail::requests_ignored:
res = "requests_votes_ignored";
case nano::stat::detail::requests_cached_hashes:
res = "requests_cached_hashes";
break;
case nano::stat::detail::requests_dropped:
res = "requests_dropped";
case nano::stat::detail::requests_generated_hashes:
res = "requests_generated_hashes";
break;
case nano::stat::detail::requests_cached_votes:
res = "requests_cached_votes";
break;
case nano::stat::detail::requests_generated_votes:
res = "requests_generated_votes";
break;
case nano::stat::detail::requests_unknown:
res = "requests_unknown";
break;
}
return res;

View file

@ -198,6 +198,7 @@ public:
observer,
confirmation_height,
drop,
aggregator,
requests
};
@ -299,11 +300,16 @@ public:
blocks_confirmed,
invalid_block,
// [request] aggregator
aggregator_accepted,
aggregator_dropped,
// requests
requests_cached,
requests_generated,
requests_ignored,
requests_dropped
requests_cached_hashes,
requests_generated_hashes,
requests_cached_votes,
requests_generated_votes,
requests_unknown
};
/** Direction of the stat. If the direction is irrelevant, use in */

View file

@ -57,10 +57,7 @@ void nano::request_aggregator::add (std::shared_ptr<nano::transport::channel> &
condition.notify_all ();
}
}
if (error)
{
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_dropped);
}
stats.inc (nano::stat::type::aggregator, !error ? nano::stat::detail::aggregator_accepted : nano::stat::detail::aggregator_dropped);
}
void nano::request_aggregator::run ()
@ -144,6 +141,17 @@ bool nano::request_aggregator::empty ()
std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, channel_pool & pool_a) const
{
// Unique hashes
using pair = decltype (pool_a.hashes_roots)::value_type;
std::sort (pool_a.hashes_roots.begin (), pool_a.hashes_roots.end (), [](pair const & pair1, pair const & pair2) {
return pair1.first < pair2.first;
});
pool_a.hashes_roots.erase (std::unique (pool_a.hashes_roots.begin (), pool_a.hashes_roots.end (), [](pair const & pair1, pair const & pair2) {
return pair1.first == pair2.first;
}),
pool_a.hashes_roots.end ());
size_t cached_hashes = 0;
std::vector<nano::block_hash> to_generate;
std::vector<std::shared_ptr<nano::vote>> cached_votes;
for (auto const & hash_root : pool_a.hashes_roots)
@ -151,6 +159,7 @@ std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transac
auto find_votes (votes_cache.find (hash_root.first));
if (!find_votes.empty ())
{
++cached_hashes;
cached_votes.insert (cached_votes.end (), find_votes.begin (), find_votes.end ());
}
else if (!hash_root.first.is_zero () && store.block_exists (transaction_a, hash_root.first))
@ -189,7 +198,7 @@ std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transac
}
else
{
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_ignored);
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_unknown, stat::dir::in);
}
}
}
@ -201,11 +210,12 @@ std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transac
nano::confirm_ack confirm (vote);
pool_a.channel->send (confirm);
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached, stat::dir::in, cached_votes.size ());
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes, stat::dir::in, cached_hashes);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_votes, stat::dir::in, cached_votes.size ());
return to_generate;
}
void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<nano::block_hash> const hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<nano::block_hash> hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
size_t generated_l = 0;
auto i (hashes_a.begin ());
@ -225,7 +235,8 @@ void nano::request_aggregator::generate (nano::transaction const & transaction_a
this->votes_cache.add (vote);
});
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated, stat::dir::in, generated_l);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes_a.size ());
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in, generated_l);
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::request_aggregator & aggregator, const std::string & name)

View file

@ -76,7 +76,7 @@ private:
/** Aggregate and send cached votes for \p pool_a, returning the leftovers that were not found in cached votes **/
std::vector<nano::block_hash> aggregate (nano::transaction const &, channel_pool & pool_a) const;
/** Generate and send votes from \p hashes_a to \p channel_a, does not need a lock on the mutex **/
void generate (nano::transaction const &, std::vector<nano::block_hash> const hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
void generate (nano::transaction const &, std::vector<nano::block_hash> hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
unsigned const max_consecutive_requests;