Request aggregator (#2485)

* Request aggregator

Adds a class that runs in a new thread to pool confirmation requests by endpoint.
This allows a reduction of bandwidth, vote generation, and moves some vote generation out of the I/O threads.

* Use a constant for confirm_ack_hashes_max

* Small code simplification

* Use const transaction

* Disable clang-format for lambdas

* Add missing deadlines and update deadline before poll block

* Use a scoped lock_guard pattern and initialize start in-class

* Misc. fixes and documentation
This commit is contained in:
Guilherme Lawless 2020-01-16 20:05:38 +00:00 committed by GitHub
commit 7bd7995c30
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 702 additions and 112 deletions

View file

@ -25,6 +25,7 @@ add_executable (core_test
memory_pool.cpp
processor_service.cpp
peer_container.cpp
request_aggregator.cpp
signing.cpp
socket.cpp
toml.cpp

View file

@ -1,4 +1,5 @@
#include <nano/node/common.hpp>
#include <nano/node/network.hpp>
#include <nano/secure/buffer.hpp>
#include <gtest/gtest.h>
@ -92,7 +93,7 @@ TEST (message, confirm_ack_serialization)
TEST (message, confirm_ack_hash_serialization)
{
std::vector<nano::block_hash> hashes;
for (auto i (hashes.size ()); i < 12; i++)
for (auto i (hashes.size ()); i < nano::network::confirm_ack_hashes_max; i++)
{
nano::keypair key1;
nano::block_hash previous;
@ -120,7 +121,7 @@ TEST (message, confirm_ack_hash_serialization)
vote_blocks.push_back (boost::get<nano::block_hash> (block));
}
ASSERT_EQ (hashes, vote_blocks);
// Check overflow with 12 hashes
// Check overflow with max hashes
ASSERT_EQ (header.count_get (), hashes.size ());
ASSERT_EQ (header.block_type (), nano::block_type::not_a_block);
}

View file

@ -2424,21 +2424,46 @@ TEST (node, local_votes_cache_batch)
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
// Generates and sends one vote for both hashes which is then cached
node.network.process_message (message, channel);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_FALSE (node.votes_cache.find (send1->hash ()).empty ());
ASSERT_FALSE (node.votes_cache.find (send2->hash ()).empty ());
// Only one confirm_ack should be sent if all hashes are part of the same vote
node.network.process_message (message, channel);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Test when votes are different
node.votes_cache.remove (send1->hash ());
node.votes_cache.remove (send2->hash ());
node.network.process_message (nano::confirm_req (send1->hash (), send1->root ()), channel);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 3)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
node.network.process_message (nano::confirm_req (send2->hash (), send2->root ()), channel);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 4)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (4, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// There are two different votes, so both should be sent in response
node.network.process_message (message, channel);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 6)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (6, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
@ -2459,9 +2484,11 @@ TEST (node, local_votes_cache_generate_new_vote)
// Repsond with cached vote
nano::confirm_req message1 (send1);
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
for (auto i (0); i < 100; ++i)
node.network.process_message (message1, channel);
system.deadline_set (3s);
while (node.votes_cache.find (send1->hash ()).empty ())
{
node.network.process_message (message1, channel);
ASSERT_NO_ERROR (system.poll ());
}
auto votes1 (node.votes_cache.find (send1->hash ()));
ASSERT_EQ (1, votes1.size ());
@ -2478,16 +2505,18 @@ TEST (node, local_votes_cache_generate_new_vote)
auto transaction (node.store.tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *send2).code);
}
// Generate new vote for request with 2 hashes (one of hashes is cached)
// One of the hashes is cached
std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes{ std::make_pair (send1->hash (), send1->root ()), std::make_pair (send2->hash (), send2->root ()) };
nano::confirm_req message2 (roots_hashes);
for (auto i (0); i < 100; ++i)
node.network.process_message (message2, channel);
system.deadline_set (3s);
while (node.votes_cache.find (send2->hash ()).empty ())
{
node.network.process_message (message2, channel);
ASSERT_NO_ERROR (system.poll ());
}
auto votes2 (node.votes_cache.find (send1->hash ()));
auto votes2 (node.votes_cache.find (send2->hash ()));
ASSERT_EQ (1, votes2.size ());
ASSERT_EQ (2, votes2[0]->blocks.size ());
ASSERT_EQ (1, votes2[0]->blocks.size ());
{
nano::lock_guard<std::mutex> lock (node.store.get_cache_mutex ());
auto transaction (node.store.tx_begin_read ());
@ -2497,6 +2526,8 @@ TEST (node, local_votes_cache_generate_new_vote)
}
ASSERT_FALSE (node.votes_cache.find (send1->hash ()).empty ());
ASSERT_FALSE (node.votes_cache.find (send2->hash ()).empty ());
// First generated + again cached + new generated
ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
// Tests that the max cache size is inversely proportional to the number of voting accounts

View file

@ -0,0 +1,287 @@
#include <nano/core_test/testutil.hpp>
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/testing.hpp>
#include <gtest/gtest.h>
using namespace std::chrono_literals;
TEST (request_aggregator, one)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ())));
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
system.deadline_set (3s);
while (!node.aggregator.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
// Not yet in the ledger, should be ignored
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// Already cached
system.deadline_set (3s);
while (!node.aggregator.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (3, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
TEST (request_aggregator, one_update)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ())));
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (send1->hash ())));
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send2).code);
node.aggregator.add (channel, request);
request.clear ();
request.emplace_back (send2->hash (), send2->root ());
// Update the pool of requests with another hash
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
TEST (request_aggregator, two)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ())));
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (send1->hash ())));
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
request.emplace_back (send2->hash (), send2->root ());
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
// Process both blocks
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send2).code);
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// One vote should be generated for both blocks
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
// The same request should now send the cached vote
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
system.deadline_set (3s);
while (!node.aggregator.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Make sure the cached vote is for both hashes
auto vote1 (node.votes_cache.find (send1->hash ()));
auto vote2 (node.votes_cache.find (send2->hash ()));
ASSERT_EQ (1, vote1.size ());
ASSERT_EQ (1, vote2.size ());
ASSERT_EQ (vote1.front (), vote2.front ());
}
TEST (request_aggregator, two_endpoints)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node1 (*system.add_node (node_config));
node_config.peering_port = nano::get_available_port ();
auto & node2 (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1.work_generate_blocking (genesis.hash ())));
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
ASSERT_EQ (nano::process_result::progress, node1.ledger.process (node1.store.tx_begin_write (), *send1).code);
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
auto channel2 (node2.network.udp_channels.create (node2.network.endpoint ()));
ASSERT_NE (nano::transport::map_endpoint_to_v6 (channel1->get_endpoint ()), nano::transport::map_endpoint_to_v6 (channel2->get_endpoint ()));
// Use the aggregator from node1 only, making requests from both nodes
node1.aggregator.add (channel1, request);
node1.aggregator.add (channel2, request);
ASSERT_EQ (2, node1.aggregator.size ());
system.deadline_set (3s);
// For the first request it generates the vote, for the second it uses the generated vote
while (!node1.aggregator.empty ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (2, node1.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
}
TEST (request_aggregator, split)
{
constexpr size_t max_vbh = nano::network::confirm_ack_hashes_max;
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
std::vector<std::pair<nano::block_hash, nano::root>> request;
auto previous = genesis.hash ();
// Add max_vbh + 1 blocks and request votes for them
for (size_t i (0); i <= max_vbh; ++i)
{
nano::block_builder builder;
auto block = builder
.state ()
.account (nano::test_genesis_key.pub)
.previous (previous)
.representative (nano::test_genesis_key.pub)
.balance (nano::genesis_amount - (i + 1))
.link (nano::test_genesis_key.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*system.work.generate (previous))
.build ();
previous = block->hash ();
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *block).code);
request.emplace_back (block->hash (), block->root ());
}
ASSERT_EQ (max_vbh + 1, request.size ());
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.aggregator.add (channel, request);
ASSERT_EQ (1, node.aggregator.size ());
// In the ledger but no vote generated yet
// Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.aggregator.empty ());
// Two votes were sent, the first one for 12 hashes and the second one for 1 hash
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::all));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
auto transaction (node.store.tx_begin_read ());
auto pre_last_hash (node.store.block_get (transaction, previous)->previous ());
auto vote1 (node.votes_cache.find (pre_last_hash));
ASSERT_EQ (1, vote1.size ());
ASSERT_EQ (max_vbh, vote1[0]->blocks.size ());
auto vote2 (node.votes_cache.find (previous));
ASSERT_EQ (1, vote2.size ());
ASSERT_EQ (1, vote2[0]->blocks.size ());
}
TEST (request_aggregator, channel_lifetime)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ())));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
{
// The aggregator should extend the lifetime of the channel
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.aggregator.add (channel, request);
}
ASSERT_EQ (1, node.aggregator.size ());
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (request_aggregator, channel_update)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ())));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
std::weak_ptr<nano::transport::channel> channel1_w;
{
auto channel1 (node.network.udp_channels.create (node.network.endpoint ()));
channel1_w = channel1;
node.aggregator.add (channel1, request);
auto channel2 (node.network.udp_channels.create (node.network.endpoint ()));
// The aggregator then hold channel2 and drop channel1
node.aggregator.add (channel2, request);
}
// Both requests were for the same endpoint, so only one pool should exist
ASSERT_EQ (1, node.aggregator.size ());
// channel1 is not being held anymore
ASSERT_EQ (nullptr, channel1_w.lock ());
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
}

View file

@ -437,6 +437,10 @@ std::string nano::stat::type_to_string (uint32_t key)
break;
case nano::stat::type::drop:
res = "drop";
break;
case nano::stat::type::requests:
res = "requests";
break;
}
return res;
}
@ -638,6 +642,16 @@ std::string nano::stat::detail_to_string (uint32_t key)
break;
case nano::stat::detail::blocks_confirmed:
res = "blocks_confirmed";
break;
case nano::stat::detail::requests_cached:
res = "requests_votes_cached";
break;
case nano::stat::detail::requests_generated:
res = "requests_votes_generated";
break;
case nano::stat::detail::requests_ignored:
res = "requests_votes_ignored";
break;
}
return res;
}

View file

@ -197,7 +197,8 @@ public:
udp,
observer,
confirmation_height,
drop
drop,
requests
};
/** Optional detail type */
@ -292,7 +293,12 @@ public:
// confirmation height
blocks_confirmed,
invalid_block
invalid_block,
// requests
requests_cached,
requests_generated,
requests_ignored
};
/** Direction of the stat. If the direction is irrelevant, use in */

View file

@ -69,6 +69,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::worker:
thread_role_name_string = "Worker";
break;
case nano::thread_role::name::request_aggregator:
thread_role_name_string = "Req aggregator";
break;
}
/*

View file

@ -31,7 +31,8 @@ namespace thread_role
rpc_process_container,
work_watcher,
confirmation_height_processing,
worker
worker,
request_aggregator
};
/*
* Get/Set the identifier for the current thread

View file

@ -98,6 +98,8 @@ add_library (node
node_pow_server_config.cpp
repcrawler.hpp
repcrawler.cpp
request_aggregator.hpp
request_aggregator.cpp
testing.hpp
testing.cpp
transport/tcp.hpp

View file

@ -194,24 +194,6 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a
return result;
}
void nano::network::confirm_hashes (nano::transaction const & transaction_a, std::shared_ptr<nano::transport::channel> channel_a, std::vector<nano::block_hash> blocks_bundle_a)
{
if (node.config.enable_voting)
{
node.wallets.foreach_representative ([this, &blocks_bundle_a, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
auto vote (this->node.store.vote_generate (transaction_a, pub_a, prv_a, blocks_bundle_a));
nano::confirm_ack confirm (vote);
std::shared_ptr<std::vector<uint8_t>> bytes (new std::vector<uint8_t>);
{
nano::vectorstream stream (*bytes);
confirm.serialize (stream);
}
channel_a->send (confirm);
this->node.votes_cache.add (vote);
});
}
}
bool nano::network::send_votes_cache (std::shared_ptr<nano::transport::channel> channel_a, nano::block_hash const & hash_a)
{
// Search in cache
@ -477,81 +459,7 @@ public:
}
else if (!message_a.roots_hashes.empty ())
{
auto transaction (node.store.tx_begin_read ());
std::vector<nano::block_hash> blocks_bundle;
std::vector<std::shared_ptr<nano::vote>> cached_votes;
size_t cached_count (0);
for (auto & root_hash : message_a.roots_hashes)
{
auto find_votes (node.votes_cache.find (root_hash.first));
if (!find_votes.empty ())
{
++cached_count;
for (auto const & vote : find_votes)
{
if (std::find (cached_votes.begin (), cached_votes.end (), vote) == cached_votes.end ())
{
cached_votes.push_back (vote);
}
}
}
if (!find_votes.empty () || (!root_hash.first.is_zero () && node.store.block_exists (transaction, root_hash.first)))
{
blocks_bundle.push_back (root_hash.first);
}
else if (!root_hash.second.is_zero ())
{
nano::block_hash successor (0);
// Search for block root
successor = node.store.block_successor (transaction, root_hash.second);
// Search for account root
if (successor.is_zero ())
{
nano::account_info info;
auto error (node.store.account_get (transaction, root_hash.second, info));
if (!error)
{
successor = info.open_block;
}
}
if (!successor.is_zero ())
{
auto find_successor_votes (node.votes_cache.find (successor));
if (!find_successor_votes.empty ())
{
++cached_count;
for (auto const & vote : find_successor_votes)
{
if (std::find (cached_votes.begin (), cached_votes.end (), vote) == cached_votes.end ())
{
cached_votes.push_back (vote);
}
}
}
blocks_bundle.push_back (successor);
auto successor_block (node.store.block_get (transaction, successor));
assert (successor_block != nullptr);
nano::publish publish (successor_block);
channel->send (publish);
}
}
}
/* Decide to send cached votes or to create new vote
If there is at least one new hash to confirm, then create new batch vote
Otherwise use more bandwidth & save local resources required to sign vote */
if (!blocks_bundle.empty () && cached_count < blocks_bundle.size ())
{
node.network.confirm_hashes (transaction, channel, blocks_bundle);
}
else
{
// Send from cache
for (auto & vote : cached_votes)
{
nano::confirm_ack confirm (vote);
channel->send (confirm);
}
}
node.aggregator.add (channel, message_a.roots_hashes);
}
}
}

View file

@ -166,6 +166,7 @@ public:
static unsigned const broadcast_interval_ms = 10;
static size_t const buffer_size = 512;
static size_t const confirm_req_hashes_max = 7;
static size_t const confirm_ack_hashes_max = 12;
};
std::unique_ptr<container_info_component> collect_container_info (network & network, const std::string & name);
}

View file

@ -134,13 +134,16 @@ rep_crawler (*this),
warmed_up (0),
votes_cache (wallets),
block_processor (*this, write_database_queue),
// clang-format off
block_processor_thread ([this]() {
nano::thread_role::set (nano::thread_role::name::block_processing);
this->block_processor.process_blocks ();
}),
// clang-format on
online_reps (ledger, network_params, config.online_weight_minimum.number ()),
vote_uniquer (block_uniquer),
active (*this),
aggregator (stats, network_params.network, votes_cache, store, wallets),
confirmation_height_processor (pending_confirmation_height, ledger, active, write_database_queue, config.conf_height_processor_batch_min_time, logger),
payment_observer_processor (observers.blocks),
wallets (wallets_store.init_error (), *this),
@ -608,6 +611,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.pending_confirmation_height, "pending_confirmation_height"));
composite->add_component (collect_container_info (node.worker, "worker"));
composite->add_component (collect_container_info (node.distributed_work, "distributed_work"));
composite->add_component (collect_container_info (node.aggregator, "request_aggregator"));
return composite;
}
@ -697,6 +701,7 @@ void nano::node::stop ()
{
block_processor_thread.join ();
}
aggregator.stop ();
vote_processor.stop ();
confirmation_height_processor.stop ();
active.stop ();

View file

@ -19,6 +19,7 @@
#include <nano/node/payment_observer_processor.hpp>
#include <nano/node/portmapping.hpp>
#include <nano/node/repcrawler.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/signatures.hpp>
#include <nano/node/vote_processor.hpp>
#include <nano/node/wallet.hpp>
@ -183,6 +184,7 @@ public:
nano::vote_uniquer vote_uniquer;
nano::pending_confirmation_height pending_confirmation_height; // Used by both active and confirmation height processor
nano::active_transactions active;
nano::request_aggregator aggregator;
nano::confirmation_height_processor confirmation_height_processor;
nano::payment_observer_processor payment_observer_processor;
nano::wallets wallets;

View file

@ -0,0 +1,221 @@
#include <nano/lib/stats.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/common.hpp>
#include <nano/node/network.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/node/voting.hpp>
#include <nano/node/wallet.hpp>
#include <nano/secure/blockstore.hpp>
nano::request_aggregator::request_aggregator (nano::stat & stats_a, nano::network_constants const & network_constants_a, nano::votes_cache & cache_a, nano::block_store & store_a, nano::wallets & wallets_a) :
max_delay (network_constants_a.is_test_network () ? 50 : 300),
small_delay (network_constants_a.is_test_network () ? 10 : 50),
stats (stats_a),
votes_cache (cache_a),
store (store_a),
wallets (wallets_a),
// clang-format off
thread ([this]() { run (); })
{
nano::unique_lock<std::mutex> lock (mutex);
condition.wait (lock, [& started = started] { return started; });
}
// clang-format on
void nano::request_aggregator::add (std::shared_ptr<nano::transport::channel> & channel_a, std::vector<std::pair<nano::block_hash, nano::root>> const & hashes_roots_a)
{
assert (wallets.rep_counts ().voting > 0);
auto const endpoint (nano::transport::map_endpoint_to_v6 (channel_a->get_endpoint ()));
nano::unique_lock<std::mutex> lock (mutex);
auto & requests_by_endpoint (requests.get<tag_endpoint> ());
auto existing (requests_by_endpoint.find (endpoint));
if (existing == requests_by_endpoint.end ())
{
existing = requests_by_endpoint.emplace (channel_a).first;
}
// clang-format off
requests_by_endpoint.modify (existing, [&hashes_roots_a, &channel_a, this](channel_pool & pool_a) {
// This extends the lifetime of the channel, which is acceptable up to max_delay
pool_a.channel = channel_a;
auto new_deadline (std::min (pool_a.start + this->max_delay, std::chrono::steady_clock::now () + this->small_delay));
pool_a.deadline = new_deadline;
pool_a.hashes_roots.insert (pool_a.hashes_roots.begin (), hashes_roots_a.begin (), hashes_roots_a.end ());
});
// clang-format on
if (requests.size () == 1)
{
lock.unlock ();
condition.notify_all ();
}
}
void nano::request_aggregator::run ()
{
nano::thread_role::set (nano::thread_role::name::request_aggregator);
nano::unique_lock<std::mutex> lock (mutex);
started = true;
lock.unlock ();
condition.notify_all ();
lock.lock ();
while (!stopped)
{
if (!requests.empty ())
{
auto & requests_by_deadline (requests.get<tag_deadline> ());
auto front (requests_by_deadline.begin ());
if (front->deadline < std::chrono::steady_clock::now ())
{
auto pool (*front);
auto transaction (store.tx_begin_read ());
// Aggregate the current pool of requests, sending cached votes
// Store a local copy of the remaining hashes and the channel
auto remaining = aggregate (transaction, pool);
auto channel = pool.channel;
// Safely erase this pool
requests_by_deadline.erase (front);
if (!remaining.empty ())
{
lock.unlock ();
// Generate votes for the remaining hashes
generate (transaction, std::move (remaining), channel);
lock.lock ();
}
}
else
{
auto deadline = front->deadline;
// clang-format off
condition.wait_until (lock, deadline, [this, &deadline]() { return this->stopped || deadline < std::chrono::steady_clock::now (); });
// clang-format on
}
}
else
{
// clang-format off
condition.wait_for (lock, small_delay, [this]() { return this->stopped || !this->requests.empty (); });
// clang-format on
}
}
}
void nano::request_aggregator::stop ()
{
{
nano::lock_guard<std::mutex> guard (mutex);
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}
std::size_t nano::request_aggregator::size ()
{
nano::unique_lock<std::mutex> lock (mutex);
return requests.size ();
}
bool nano::request_aggregator::empty ()
{
return size () == 0;
}
std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, channel_pool & pool_a) const
{
std::vector<nano::block_hash> to_generate;
std::vector<std::shared_ptr<nano::vote>> cached_votes;
for (auto const & hash_root : pool_a.hashes_roots)
{
auto find_votes (votes_cache.find (hash_root.first));
if (!find_votes.empty ())
{
cached_votes.insert (cached_votes.end (), find_votes.begin (), find_votes.end ());
}
else if (!hash_root.first.is_zero () && store.block_exists (transaction_a, hash_root.first))
{
to_generate.push_back (hash_root.first);
}
else if (!hash_root.second.is_zero ())
{
// Search for block root
auto successor (store.block_successor (transaction_a, hash_root.second));
// Search for account root
if (successor.is_zero ())
{
nano::account_info info;
auto error (store.account_get (transaction_a, hash_root.second, info));
if (!error)
{
successor = info.open_block;
}
}
if (!successor.is_zero ())
{
auto find_successor_votes (votes_cache.find (successor));
if (!find_successor_votes.empty ())
{
cached_votes.insert (cached_votes.end (), find_successor_votes.begin (), find_successor_votes.end ());
}
else
{
to_generate.push_back (successor);
}
auto successor_block (store.block_get (transaction_a, successor));
assert (successor_block != nullptr);
nano::publish publish (successor_block);
pool_a.channel->send (publish);
}
else
{
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_ignored);
}
}
}
// Unique votes
std::sort (cached_votes.begin (), cached_votes.end ());
cached_votes.erase (std::unique (cached_votes.begin (), cached_votes.end ()), cached_votes.end ());
for (auto const & vote : cached_votes)
{
nano::confirm_ack confirm (vote);
pool_a.channel->send (confirm);
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached, stat::dir::in, cached_votes.size ());
return to_generate;
}
void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<nano::block_hash> const hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
size_t generated_l = 0;
auto i (hashes_a.begin ());
auto n (hashes_a.end ());
while (i != n)
{
std::vector<nano::block_hash> hashes_l;
for (; i != n && hashes_l.size () < nano::network::confirm_ack_hashes_max; ++i)
{
hashes_l.push_back (*i);
}
// clang-format off
wallets.foreach_representative ([this, &generated_l, &hashes_l, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
auto vote (this->store.vote_generate (transaction_a, pub_a, prv_a, hashes_l));
++generated_l;
nano::confirm_ack confirm (vote);
channel_a->send (confirm);
this->votes_cache.add (vote);
});
// clang-format on
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated, stat::dir::in, generated_l);
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::request_aggregator & aggregator, const std::string & name)
{
auto pools_count = aggregator.size ();
auto sizeof_element = sizeof (decltype (aggregator.requests)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pools", pools_count, sizeof_element }));
return composite;
}

View file

@ -0,0 +1,103 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/node/transport/transport.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index_container.hpp>
#include <condition_variable>
#include <thread>
#include <unordered_map>
namespace mi = boost::multi_index;
namespace nano
{
class votes_cache;
class block_store;
class wallets;
class stat;
/**
* Pools together confirmation requests, separately for each endpoint.
* Requests are added from network messages, and aggregated to minimize bandwidth and vote generation. Example:
* * Two votes are cached, one for hashes {1,2,3} and another for hashes {4,5,6}
* * A request arrives for hashes {1,4,5}. Another request arrives soon afterwards for hashes {2,3,6}
* * The aggregator will reply with the two cached votes
* Votes are generated for uncached hashes.
*/
class request_aggregator final
{
/**
* Holds a buffer of incoming requests from an endpoint.
* Extends the lifetime of the corresponding channel. The channel is updated on a new request arriving from the same endpoint, such that only the newest channel is held
*/
struct channel_pool final
{
channel_pool () = delete;
explicit channel_pool (std::shared_ptr<nano::transport::channel> & channel_a) :
channel (channel_a),
endpoint (nano::transport::map_endpoint_to_v6 (channel_a->get_endpoint ()))
{
}
std::vector<std::pair<nano::block_hash, nano::root>> hashes_roots;
std::shared_ptr<nano::transport::channel> channel;
nano::endpoint endpoint;
std::chrono::steady_clock::time_point const start{ std::chrono::steady_clock::now () };
std::chrono::steady_clock::time_point deadline;
};
// clang-format off
class tag_endpoint {};
class tag_deadline {};
// clang-format on
public:
request_aggregator () = delete;
request_aggregator (nano::stat &, nano::network_constants const &, nano::votes_cache &, nano::block_store &, nano::wallets &);
/** Add a new request by \p channel_a for hashes \p hashes_roots_a */
void add (std::shared_ptr<nano::transport::channel> & channel_a, std::vector<std::pair<nano::block_hash, nano::root>> const & hashes_roots_a);
void stop ();
/** Returns the number of currently queued request pools */
size_t size ();
bool empty ();
const std::chrono::milliseconds max_delay;
const std::chrono::milliseconds small_delay;
private:
void run ();
/** Aggregate and send cached votes for \p pool_a, returning the leftovers that were not found in cached votes **/
std::vector<nano::block_hash> aggregate (nano::transaction const &, channel_pool & pool_a) const;
/** Generate and send votes from \p hashes_a to \p channel_a, does not need a lock on the mutex **/
void generate (nano::transaction const &, std::vector<nano::block_hash> const hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
nano::stat & stats;
nano::votes_cache & votes_cache;
nano::block_store & store;
nano::wallets & wallets;
// clang-format off
boost::multi_index_container<channel_pool,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_endpoint>,
mi::member<channel_pool, nano::endpoint, &channel_pool::endpoint>>,
mi::ordered_non_unique<mi::tag<tag_deadline>,
mi::member<channel_pool, std::chrono::steady_clock::time_point, &channel_pool::deadline>>>>
requests;
// clang-format on
bool stopped{ false };
bool started{ false };
nano::condition_variable condition;
std::mutex mutex;
std::thread thread;
friend std::unique_ptr<container_info_component> collect_container_info (request_aggregator &, const std::string &);
};
std::unique_ptr<container_info_component> collect_container_info (request_aggregator &, const std::string &);
}

View file

@ -29,7 +29,7 @@ void nano::vote_generator::add (nano::block_hash const & hash_a)
{
nano::unique_lock<std::mutex> lock (mutex);
hashes.push_back (hash_a);
if (hashes.size () >= 12)
if (hashes.size () >= nano::network::confirm_ack_hashes_max)
{
lock.unlock ();
condition.notify_all ();
@ -53,8 +53,8 @@ void nano::vote_generator::stop ()
void nano::vote_generator::send (nano::unique_lock<std::mutex> & lock_a)
{
std::vector<nano::block_hash> hashes_l;
hashes_l.reserve (12);
while (!hashes.empty () && hashes_l.size () < 12)
hashes_l.reserve (nano::network::confirm_ack_hashes_max);
while (!hashes.empty () && hashes_l.size () < nano::network::confirm_ack_hashes_max)
{
hashes_l.push_back (hashes.front ());
hashes.pop_front ();
@ -81,16 +81,20 @@ void nano::vote_generator::run ()
lock.lock ();
while (!stopped)
{
if (hashes.size () >= 12)
if (hashes.size () >= nano::network::confirm_ack_hashes_max)
{
send (lock);
}
else
{
condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= 12; });
if (hashes.size () >= config.vote_generator_threshold && hashes.size () < 12)
// clang-format off
condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= nano::network::confirm_ack_hashes_max; });
// clang-format on
if (hashes.size () >= config.vote_generator_threshold && hashes.size () < nano::network::confirm_ack_hashes_max)
{
condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= 12; });
// clang-format off
condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= nano::network::confirm_ack_hashes_max; });
// clang-format on
}
if (!hashes.empty ())
{