diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 69e8b945..e6a12799 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -25,6 +25,7 @@ add_executable (core_test memory_pool.cpp processor_service.cpp peer_container.cpp + request_aggregator.cpp signing.cpp socket.cpp toml.cpp diff --git a/nano/core_test/message.cpp b/nano/core_test/message.cpp index 2eff4917..08dbc916 100644 --- a/nano/core_test/message.cpp +++ b/nano/core_test/message.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -92,7 +93,7 @@ TEST (message, confirm_ack_serialization) TEST (message, confirm_ack_hash_serialization) { std::vector hashes; - for (auto i (hashes.size ()); i < 12; i++) + for (auto i (hashes.size ()); i < nano::network::confirm_ack_hashes_max; i++) { nano::keypair key1; nano::block_hash previous; @@ -120,7 +121,7 @@ TEST (message, confirm_ack_hash_serialization) vote_blocks.push_back (boost::get (block)); } ASSERT_EQ (hashes, vote_blocks); - // Check overflow with 12 hashes + // Check overflow with max hashes ASSERT_EQ (header.count_get (), hashes.size ()); ASSERT_EQ (header.block_type (), nano::block_type::not_a_block); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 535a4f42..5fd63493 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2424,21 +2424,46 @@ TEST (node, local_votes_cache_batch) auto channel (node.network.udp_channels.create (node.network.endpoint ())); // Generates and sends one vote for both hashes which is then cached node.network.process_message (message, channel); + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 1) + { + ASSERT_NO_ERROR (system.poll ()); + } ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); ASSERT_FALSE (node.votes_cache.find (send1->hash ()).empty ()); ASSERT_FALSE (node.votes_cache.find (send2->hash ()).empty ()); // Only one confirm_ack should be sent if all hashes are part of the same vote node.network.process_message (message, channel); + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 2) + { + ASSERT_NO_ERROR (system.poll ()); + } ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // Test when votes are different node.votes_cache.remove (send1->hash ()); node.votes_cache.remove (send2->hash ()); node.network.process_message (nano::confirm_req (send1->hash (), send1->root ()), channel); + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 3) + { + ASSERT_NO_ERROR (system.poll ()); + } ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); node.network.process_message (nano::confirm_req (send2->hash (), send2->root ()), channel); + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 4) + { + ASSERT_NO_ERROR (system.poll ()); + } ASSERT_EQ (4, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // There are two different votes, so both should be sent in response node.network.process_message (message, channel); + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) < 6) + { + ASSERT_NO_ERROR (system.poll ()); + } ASSERT_EQ (6, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } @@ -2459,9 +2484,11 @@ TEST (node, local_votes_cache_generate_new_vote) // Repsond with cached vote nano::confirm_req message1 (send1); auto channel (node.network.udp_channels.create (node.network.endpoint ())); - for (auto i (0); i < 100; ++i) + node.network.process_message (message1, channel); + system.deadline_set (3s); + while (node.votes_cache.find (send1->hash ()).empty ()) { - node.network.process_message (message1, channel); + ASSERT_NO_ERROR (system.poll ()); } auto votes1 (node.votes_cache.find (send1->hash ())); ASSERT_EQ (1, votes1.size ()); @@ -2478,16 +2505,18 @@ TEST (node, local_votes_cache_generate_new_vote) auto transaction (node.store.tx_begin_write ()); ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *send2).code); } - // Generate new vote for request with 2 hashes (one of hashes is cached) + // One of the hashes is cached std::vector> roots_hashes{ std::make_pair (send1->hash (), send1->root ()), std::make_pair (send2->hash (), send2->root ()) }; nano::confirm_req message2 (roots_hashes); - for (auto i (0); i < 100; ++i) + node.network.process_message (message2, channel); + system.deadline_set (3s); + while (node.votes_cache.find (send2->hash ()).empty ()) { - node.network.process_message (message2, channel); + ASSERT_NO_ERROR (system.poll ()); } - auto votes2 (node.votes_cache.find (send1->hash ())); + auto votes2 (node.votes_cache.find (send2->hash ())); ASSERT_EQ (1, votes2.size ()); - ASSERT_EQ (2, votes2[0]->blocks.size ()); + ASSERT_EQ (1, votes2[0]->blocks.size ()); { nano::lock_guard lock (node.store.get_cache_mutex ()); auto transaction (node.store.tx_begin_read ()); @@ -2497,6 +2526,8 @@ TEST (node, local_votes_cache_generate_new_vote) } ASSERT_FALSE (node.votes_cache.find (send1->hash ()).empty ()); ASSERT_FALSE (node.votes_cache.find (send2->hash ()).empty ()); + // First generated + again cached + new generated + ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } // Tests that the max cache size is inversely proportional to the number of voting accounts diff --git a/nano/core_test/request_aggregator.cpp b/nano/core_test/request_aggregator.cpp new file mode 100644 index 00000000..9192f080 --- /dev/null +++ b/nano/core_test/request_aggregator.cpp @@ -0,0 +1,287 @@ +#include +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +TEST (request_aggregator, one) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto & node (*system.add_node (node_config)); + nano::genesis genesis; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ()))); + std::vector> request; + request.emplace_back (send1->hash (), send1->root ()); + auto channel (node.network.udp_channels.create (node.network.endpoint ())); + node.aggregator.add (channel, request); + ASSERT_EQ (1, node.aggregator.size ()); + system.deadline_set (3s); + while (!node.aggregator.empty ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + // Not yet in the ledger, should be ignored + ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); + node.aggregator.add (channel, request); + ASSERT_EQ (1, node.aggregator.size ()); + // In the ledger but no vote generated yet + // Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_TRUE (node.aggregator.empty ()); + node.aggregator.add (channel, request); + ASSERT_EQ (1, node.aggregator.size ()); + // Already cached + system.deadline_set (3s); + while (!node.aggregator.empty ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (3, node.stats.count (nano::stat::type::requests, nano::stat::detail::all)); + ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); + ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); + ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); + ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); +} + +TEST (request_aggregator, one_update) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto & node (*system.add_node (node_config)); + nano::genesis genesis; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ()))); + auto send2 (std::make_shared (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (send1->hash ()))); + std::vector> request; + request.emplace_back (send1->hash (), send1->root ()); + auto channel (node.network.udp_channels.create (node.network.endpoint ())); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send2).code); + node.aggregator.add (channel, request); + request.clear (); + request.emplace_back (send2->hash (), send2->root ()); + // Update the pool of requests with another hash + node.aggregator.add (channel, request); + ASSERT_EQ (1, node.aggregator.size ()); + // In the ledger but no vote generated yet + // Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_TRUE (node.aggregator.empty ()); + ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::all)); + ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); + ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); + ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); + ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); +} + +TEST (request_aggregator, two) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto & node (*system.add_node (node_config)); + nano::genesis genesis; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ()))); + auto send2 (std::make_shared (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (send1->hash ()))); + std::vector> request; + request.emplace_back (send1->hash (), send1->root ()); + request.emplace_back (send2->hash (), send2->root ()); + auto channel (node.network.udp_channels.create (node.network.endpoint ())); + // Process both blocks + ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send2).code); + node.aggregator.add (channel, request); + ASSERT_EQ (1, node.aggregator.size ()); + // One vote should be generated for both blocks + // Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_TRUE (node.aggregator.empty ()); + ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); + // The same request should now send the cached vote + node.aggregator.add (channel, request); + ASSERT_EQ (1, node.aggregator.size ()); + system.deadline_set (3s); + while (!node.aggregator.empty ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::all)); + ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); + ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); + ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); + ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); + // Make sure the cached vote is for both hashes + auto vote1 (node.votes_cache.find (send1->hash ())); + auto vote2 (node.votes_cache.find (send2->hash ())); + ASSERT_EQ (1, vote1.size ()); + ASSERT_EQ (1, vote2.size ()); + ASSERT_EQ (vote1.front (), vote2.front ()); +} + +TEST (request_aggregator, two_endpoints) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto & node1 (*system.add_node (node_config)); + node_config.peering_port = nano::get_available_port (); + auto & node2 (*system.add_node (node_config)); + nano::genesis genesis; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1.work_generate_blocking (genesis.hash ()))); + std::vector> request; + request.emplace_back (send1->hash (), send1->root ()); + ASSERT_EQ (nano::process_result::progress, node1.ledger.process (node1.store.tx_begin_write (), *send1).code); + auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); + auto channel2 (node2.network.udp_channels.create (node2.network.endpoint ())); + ASSERT_NE (nano::transport::map_endpoint_to_v6 (channel1->get_endpoint ()), nano::transport::map_endpoint_to_v6 (channel2->get_endpoint ())); + // Use the aggregator from node1 only, making requests from both nodes + node1.aggregator.add (channel1, request); + node1.aggregator.add (channel2, request); + ASSERT_EQ (2, node1.aggregator.size ()); + system.deadline_set (3s); + // For the first request it generates the vote, for the second it uses the generated vote + while (!node1.aggregator.empty ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (2, node1.stats.count (nano::stat::type::requests, nano::stat::detail::all)); + ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); + ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); + ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); +} + +TEST (request_aggregator, split) +{ + constexpr size_t max_vbh = nano::network::confirm_ack_hashes_max; + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto & node (*system.add_node (node_config)); + nano::genesis genesis; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + std::vector> request; + auto previous = genesis.hash (); + // Add max_vbh + 1 blocks and request votes for them + for (size_t i (0); i <= max_vbh; ++i) + { + nano::block_builder builder; + auto block = builder + .state () + .account (nano::test_genesis_key.pub) + .previous (previous) + .representative (nano::test_genesis_key.pub) + .balance (nano::genesis_amount - (i + 1)) + .link (nano::test_genesis_key.pub) + .sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub) + .work (*system.work.generate (previous)) + .build (); + previous = block->hash (); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *block).code); + request.emplace_back (block->hash (), block->root ()); + } + ASSERT_EQ (max_vbh + 1, request.size ()); + auto channel (node.network.udp_channels.create (node.network.endpoint ())); + node.aggregator.add (channel, request); + ASSERT_EQ (1, node.aggregator.size ()); + // In the ledger but no vote generated yet + // Generated votes are created after the pool is removed from the aggregator, so a simple check on empty () is not enough + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) < 2) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_TRUE (node.aggregator.empty ()); + // Two votes were sent, the first one for 12 hashes and the second one for 1 hash + ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::all)); + ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); + ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); + ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); + ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); + auto transaction (node.store.tx_begin_read ()); + auto pre_last_hash (node.store.block_get (transaction, previous)->previous ()); + auto vote1 (node.votes_cache.find (pre_last_hash)); + ASSERT_EQ (1, vote1.size ()); + ASSERT_EQ (max_vbh, vote1[0]->blocks.size ()); + auto vote2 (node.votes_cache.find (previous)); + ASSERT_EQ (1, vote2.size ()); + ASSERT_EQ (1, vote2[0]->blocks.size ()); +} + +TEST (request_aggregator, channel_lifetime) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto & node (*system.add_node (node_config)); + nano::genesis genesis; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ()))); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); + std::vector> request; + request.emplace_back (send1->hash (), send1->root ()); + { + // The aggregator should extend the lifetime of the channel + auto channel (node.network.udp_channels.create (node.network.endpoint ())); + node.aggregator.add (channel, request); + } + ASSERT_EQ (1, node.aggregator.size ()); + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + +TEST (request_aggregator, channel_update) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto & node (*system.add_node (node_config)); + nano::genesis genesis; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ()))); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); + std::vector> request; + request.emplace_back (send1->hash (), send1->root ()); + std::weak_ptr channel1_w; + { + auto channel1 (node.network.udp_channels.create (node.network.endpoint ())); + channel1_w = channel1; + node.aggregator.add (channel1, request); + auto channel2 (node.network.udp_channels.create (node.network.endpoint ())); + // The aggregator then hold channel2 and drop channel1 + node.aggregator.add (channel2, request); + } + // Both requests were for the same endpoint, so only one pool should exist + ASSERT_EQ (1, node.aggregator.size ()); + // channel1 is not being held anymore + ASSERT_EQ (nullptr, channel1_w.lock ()); + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated) == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } +} diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index f2adc105..32ec1ade 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -437,6 +437,10 @@ std::string nano::stat::type_to_string (uint32_t key) break; case nano::stat::type::drop: res = "drop"; + break; + case nano::stat::type::requests: + res = "requests"; + break; } return res; } @@ -638,6 +642,16 @@ std::string nano::stat::detail_to_string (uint32_t key) break; case nano::stat::detail::blocks_confirmed: res = "blocks_confirmed"; + break; + case nano::stat::detail::requests_cached: + res = "requests_votes_cached"; + break; + case nano::stat::detail::requests_generated: + res = "requests_votes_generated"; + break; + case nano::stat::detail::requests_ignored: + res = "requests_votes_ignored"; + break; } return res; } diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index 99568b09..d5f7c5ff 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -197,7 +197,8 @@ public: udp, observer, confirmation_height, - drop + drop, + requests }; /** Optional detail type */ @@ -292,7 +293,12 @@ public: // confirmation height blocks_confirmed, - invalid_block + invalid_block, + + // requests + requests_cached, + requests_generated, + requests_ignored }; /** Direction of the stat. If the direction is irrelevant, use in */ diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index 264d0c84..ea04f40a 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -69,6 +69,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::worker: thread_role_name_string = "Worker"; break; + case nano::thread_role::name::request_aggregator: + thread_role_name_string = "Req aggregator"; + break; } /* diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index 9f56c6d5..56f7f531 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -31,7 +31,8 @@ namespace thread_role rpc_process_container, work_watcher, confirmation_height_processing, - worker + worker, + request_aggregator }; /* * Get/Set the identifier for the current thread diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index dee51457..776adc80 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -98,6 +98,8 @@ add_library (node node_pow_server_config.cpp repcrawler.hpp repcrawler.cpp + request_aggregator.hpp + request_aggregator.cpp testing.hpp testing.cpp transport/tcp.hpp diff --git a/nano/node/network.cpp b/nano/node/network.cpp index ab19c389..b9ee24b7 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -194,24 +194,6 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a return result; } -void nano::network::confirm_hashes (nano::transaction const & transaction_a, std::shared_ptr channel_a, std::vector blocks_bundle_a) -{ - if (node.config.enable_voting) - { - node.wallets.foreach_representative ([this, &blocks_bundle_a, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) { - auto vote (this->node.store.vote_generate (transaction_a, pub_a, prv_a, blocks_bundle_a)); - nano::confirm_ack confirm (vote); - std::shared_ptr> bytes (new std::vector); - { - nano::vectorstream stream (*bytes); - confirm.serialize (stream); - } - channel_a->send (confirm); - this->node.votes_cache.add (vote); - }); - } -} - bool nano::network::send_votes_cache (std::shared_ptr channel_a, nano::block_hash const & hash_a) { // Search in cache @@ -477,81 +459,7 @@ public: } else if (!message_a.roots_hashes.empty ()) { - auto transaction (node.store.tx_begin_read ()); - std::vector blocks_bundle; - std::vector> cached_votes; - size_t cached_count (0); - for (auto & root_hash : message_a.roots_hashes) - { - auto find_votes (node.votes_cache.find (root_hash.first)); - if (!find_votes.empty ()) - { - ++cached_count; - for (auto const & vote : find_votes) - { - if (std::find (cached_votes.begin (), cached_votes.end (), vote) == cached_votes.end ()) - { - cached_votes.push_back (vote); - } - } - } - if (!find_votes.empty () || (!root_hash.first.is_zero () && node.store.block_exists (transaction, root_hash.first))) - { - blocks_bundle.push_back (root_hash.first); - } - else if (!root_hash.second.is_zero ()) - { - nano::block_hash successor (0); - // Search for block root - successor = node.store.block_successor (transaction, root_hash.second); - // Search for account root - if (successor.is_zero ()) - { - nano::account_info info; - auto error (node.store.account_get (transaction, root_hash.second, info)); - if (!error) - { - successor = info.open_block; - } - } - if (!successor.is_zero ()) - { - auto find_successor_votes (node.votes_cache.find (successor)); - if (!find_successor_votes.empty ()) - { - ++cached_count; - for (auto const & vote : find_successor_votes) - { - if (std::find (cached_votes.begin (), cached_votes.end (), vote) == cached_votes.end ()) - { - cached_votes.push_back (vote); - } - } - } - blocks_bundle.push_back (successor); - auto successor_block (node.store.block_get (transaction, successor)); - assert (successor_block != nullptr); - nano::publish publish (successor_block); - channel->send (publish); - } - } - } - /* Decide to send cached votes or to create new vote - If there is at least one new hash to confirm, then create new batch vote - Otherwise use more bandwidth & save local resources required to sign vote */ - if (!blocks_bundle.empty () && cached_count < blocks_bundle.size ()) - { - node.network.confirm_hashes (transaction, channel, blocks_bundle); - } - else - { - // Send from cache - for (auto & vote : cached_votes) - { - nano::confirm_ack confirm (vote); - channel->send (confirm); - } - } + node.aggregator.add (channel, message_a.roots_hashes); } } } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index dae189bf..29135447 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -166,6 +166,7 @@ public: static unsigned const broadcast_interval_ms = 10; static size_t const buffer_size = 512; static size_t const confirm_req_hashes_max = 7; + static size_t const confirm_ack_hashes_max = 12; }; std::unique_ptr collect_container_info (network & network, const std::string & name); } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 1519e803..cc4ea7e9 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -134,13 +134,16 @@ rep_crawler (*this), warmed_up (0), votes_cache (wallets), block_processor (*this, write_database_queue), +// clang-format off block_processor_thread ([this]() { nano::thread_role::set (nano::thread_role::name::block_processing); this->block_processor.process_blocks (); }), +// clang-format on online_reps (ledger, network_params, config.online_weight_minimum.number ()), vote_uniquer (block_uniquer), active (*this), +aggregator (stats, network_params.network, votes_cache, store, wallets), confirmation_height_processor (pending_confirmation_height, ledger, active, write_database_queue, config.conf_height_processor_batch_min_time, logger), payment_observer_processor (observers.blocks), wallets (wallets_store.init_error (), *this), @@ -608,6 +611,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.pending_confirmation_height, "pending_confirmation_height")); composite->add_component (collect_container_info (node.worker, "worker")); composite->add_component (collect_container_info (node.distributed_work, "distributed_work")); + composite->add_component (collect_container_info (node.aggregator, "request_aggregator")); return composite; } @@ -697,6 +701,7 @@ void nano::node::stop () { block_processor_thread.join (); } + aggregator.stop (); vote_processor.stop (); confirmation_height_processor.stop (); active.stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 0155d1f8..b8985a5d 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -183,6 +184,7 @@ public: nano::vote_uniquer vote_uniquer; nano::pending_confirmation_height pending_confirmation_height; // Used by both active and confirmation height processor nano::active_transactions active; + nano::request_aggregator aggregator; nano::confirmation_height_processor confirmation_height_processor; nano::payment_observer_processor payment_observer_processor; nano::wallets wallets; diff --git a/nano/node/request_aggregator.cpp b/nano/node/request_aggregator.cpp new file mode 100644 index 00000000..007fe48a --- /dev/null +++ b/nano/node/request_aggregator.cpp @@ -0,0 +1,221 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +nano::request_aggregator::request_aggregator (nano::stat & stats_a, nano::network_constants const & network_constants_a, nano::votes_cache & cache_a, nano::block_store & store_a, nano::wallets & wallets_a) : +max_delay (network_constants_a.is_test_network () ? 50 : 300), +small_delay (network_constants_a.is_test_network () ? 10 : 50), +stats (stats_a), +votes_cache (cache_a), +store (store_a), +wallets (wallets_a), +// clang-format off +thread ([this]() { run (); }) +{ + nano::unique_lock lock (mutex); + condition.wait (lock, [& started = started] { return started; }); +} +// clang-format on + +void nano::request_aggregator::add (std::shared_ptr & channel_a, std::vector> const & hashes_roots_a) +{ + assert (wallets.rep_counts ().voting > 0); + auto const endpoint (nano::transport::map_endpoint_to_v6 (channel_a->get_endpoint ())); + nano::unique_lock lock (mutex); + auto & requests_by_endpoint (requests.get ()); + auto existing (requests_by_endpoint.find (endpoint)); + if (existing == requests_by_endpoint.end ()) + { + existing = requests_by_endpoint.emplace (channel_a).first; + } + // clang-format off + requests_by_endpoint.modify (existing, [&hashes_roots_a, &channel_a, this](channel_pool & pool_a) { + // This extends the lifetime of the channel, which is acceptable up to max_delay + pool_a.channel = channel_a; + auto new_deadline (std::min (pool_a.start + this->max_delay, std::chrono::steady_clock::now () + this->small_delay)); + pool_a.deadline = new_deadline; + pool_a.hashes_roots.insert (pool_a.hashes_roots.begin (), hashes_roots_a.begin (), hashes_roots_a.end ()); + }); + // clang-format on + if (requests.size () == 1) + { + lock.unlock (); + condition.notify_all (); + } +} + +void nano::request_aggregator::run () +{ + nano::thread_role::set (nano::thread_role::name::request_aggregator); + nano::unique_lock lock (mutex); + started = true; + lock.unlock (); + condition.notify_all (); + lock.lock (); + while (!stopped) + { + if (!requests.empty ()) + { + auto & requests_by_deadline (requests.get ()); + auto front (requests_by_deadline.begin ()); + if (front->deadline < std::chrono::steady_clock::now ()) + { + auto pool (*front); + auto transaction (store.tx_begin_read ()); + // Aggregate the current pool of requests, sending cached votes + // Store a local copy of the remaining hashes and the channel + auto remaining = aggregate (transaction, pool); + auto channel = pool.channel; + // Safely erase this pool + requests_by_deadline.erase (front); + if (!remaining.empty ()) + { + lock.unlock (); + // Generate votes for the remaining hashes + generate (transaction, std::move (remaining), channel); + lock.lock (); + } + } + else + { + auto deadline = front->deadline; + // clang-format off + condition.wait_until (lock, deadline, [this, &deadline]() { return this->stopped || deadline < std::chrono::steady_clock::now (); }); + // clang-format on + } + } + else + { + // clang-format off + condition.wait_for (lock, small_delay, [this]() { return this->stopped || !this->requests.empty (); }); + // clang-format on + } + } +} + +void nano::request_aggregator::stop () +{ + { + nano::lock_guard guard (mutex); + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } +} + +std::size_t nano::request_aggregator::size () +{ + nano::unique_lock lock (mutex); + return requests.size (); +} + +bool nano::request_aggregator::empty () +{ + return size () == 0; +} + +std::vector nano::request_aggregator::aggregate (nano::transaction const & transaction_a, channel_pool & pool_a) const +{ + std::vector to_generate; + std::vector> cached_votes; + for (auto const & hash_root : pool_a.hashes_roots) + { + auto find_votes (votes_cache.find (hash_root.first)); + if (!find_votes.empty ()) + { + cached_votes.insert (cached_votes.end (), find_votes.begin (), find_votes.end ()); + } + else if (!hash_root.first.is_zero () && store.block_exists (transaction_a, hash_root.first)) + { + to_generate.push_back (hash_root.first); + } + else if (!hash_root.second.is_zero ()) + { + // Search for block root + auto successor (store.block_successor (transaction_a, hash_root.second)); + // Search for account root + if (successor.is_zero ()) + { + nano::account_info info; + auto error (store.account_get (transaction_a, hash_root.second, info)); + if (!error) + { + successor = info.open_block; + } + } + if (!successor.is_zero ()) + { + auto find_successor_votes (votes_cache.find (successor)); + if (!find_successor_votes.empty ()) + { + cached_votes.insert (cached_votes.end (), find_successor_votes.begin (), find_successor_votes.end ()); + } + else + { + to_generate.push_back (successor); + } + auto successor_block (store.block_get (transaction_a, successor)); + assert (successor_block != nullptr); + nano::publish publish (successor_block); + pool_a.channel->send (publish); + } + else + { + stats.inc (nano::stat::type::requests, nano::stat::detail::requests_ignored); + } + } + } + // Unique votes + std::sort (cached_votes.begin (), cached_votes.end ()); + cached_votes.erase (std::unique (cached_votes.begin (), cached_votes.end ()), cached_votes.end ()); + for (auto const & vote : cached_votes) + { + nano::confirm_ack confirm (vote); + pool_a.channel->send (confirm); + } + stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached, stat::dir::in, cached_votes.size ()); + return to_generate; +} + +void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector const hashes_a, std::shared_ptr & channel_a) const +{ + size_t generated_l = 0; + auto i (hashes_a.begin ()); + auto n (hashes_a.end ()); + while (i != n) + { + std::vector hashes_l; + for (; i != n && hashes_l.size () < nano::network::confirm_ack_hashes_max; ++i) + { + hashes_l.push_back (*i); + } + // clang-format off + wallets.foreach_representative ([this, &generated_l, &hashes_l, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) { + auto vote (this->store.vote_generate (transaction_a, pub_a, prv_a, hashes_l)); + ++generated_l; + nano::confirm_ack confirm (vote); + channel_a->send (confirm); + this->votes_cache.add (vote); + }); + // clang-format on + } + stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated, stat::dir::in, generated_l); +} + +std::unique_ptr nano::collect_container_info (nano::request_aggregator & aggregator, const std::string & name) +{ + auto pools_count = aggregator.size (); + auto sizeof_element = sizeof (decltype (aggregator.requests)::value_type); + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "pools", pools_count, sizeof_element })); + return composite; +} diff --git a/nano/node/request_aggregator.hpp b/nano/node/request_aggregator.hpp new file mode 100644 index 00000000..ceee0f78 --- /dev/null +++ b/nano/node/request_aggregator.hpp @@ -0,0 +1,103 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class votes_cache; +class block_store; +class wallets; +class stat; +/** + * Pools together confirmation requests, separately for each endpoint. + * Requests are added from network messages, and aggregated to minimize bandwidth and vote generation. Example: + * * Two votes are cached, one for hashes {1,2,3} and another for hashes {4,5,6} + * * A request arrives for hashes {1,4,5}. Another request arrives soon afterwards for hashes {2,3,6} + * * The aggregator will reply with the two cached votes + * Votes are generated for uncached hashes. + */ +class request_aggregator final +{ + /** + * Holds a buffer of incoming requests from an endpoint. + * Extends the lifetime of the corresponding channel. The channel is updated on a new request arriving from the same endpoint, such that only the newest channel is held + */ + struct channel_pool final + { + channel_pool () = delete; + explicit channel_pool (std::shared_ptr & channel_a) : + channel (channel_a), + endpoint (nano::transport::map_endpoint_to_v6 (channel_a->get_endpoint ())) + { + } + std::vector> hashes_roots; + std::shared_ptr channel; + nano::endpoint endpoint; + std::chrono::steady_clock::time_point const start{ std::chrono::steady_clock::now () }; + std::chrono::steady_clock::time_point deadline; + }; + + // clang-format off + class tag_endpoint {}; + class tag_deadline {}; + // clang-format on + +public: + request_aggregator () = delete; + request_aggregator (nano::stat &, nano::network_constants const &, nano::votes_cache &, nano::block_store &, nano::wallets &); + + /** Add a new request by \p channel_a for hashes \p hashes_roots_a */ + void add (std::shared_ptr & channel_a, std::vector> const & hashes_roots_a); + void stop (); + /** Returns the number of currently queued request pools */ + size_t size (); + bool empty (); + + const std::chrono::milliseconds max_delay; + const std::chrono::milliseconds small_delay; + +private: + void run (); + /** Aggregate and send cached votes for \p pool_a, returning the leftovers that were not found in cached votes **/ + std::vector aggregate (nano::transaction const &, channel_pool & pool_a) const; + /** Generate and send votes from \p hashes_a to \p channel_a, does not need a lock on the mutex **/ + void generate (nano::transaction const &, std::vector const hashes_a, std::shared_ptr & channel_a) const; + + nano::stat & stats; + nano::votes_cache & votes_cache; + nano::block_store & store; + nano::wallets & wallets; + + // clang-format off + boost::multi_index_container, + mi::member>, + mi::ordered_non_unique, + mi::member>>> + requests; + // clang-format on + + bool stopped{ false }; + bool started{ false }; + nano::condition_variable condition; + std::mutex mutex; + std::thread thread; + + friend std::unique_ptr collect_container_info (request_aggregator &, const std::string &); +}; +std::unique_ptr collect_container_info (request_aggregator &, const std::string &); +} diff --git a/nano/node/voting.cpp b/nano/node/voting.cpp index e9666415..0314559e 100644 --- a/nano/node/voting.cpp +++ b/nano/node/voting.cpp @@ -29,7 +29,7 @@ void nano::vote_generator::add (nano::block_hash const & hash_a) { nano::unique_lock lock (mutex); hashes.push_back (hash_a); - if (hashes.size () >= 12) + if (hashes.size () >= nano::network::confirm_ack_hashes_max) { lock.unlock (); condition.notify_all (); @@ -53,8 +53,8 @@ void nano::vote_generator::stop () void nano::vote_generator::send (nano::unique_lock & lock_a) { std::vector hashes_l; - hashes_l.reserve (12); - while (!hashes.empty () && hashes_l.size () < 12) + hashes_l.reserve (nano::network::confirm_ack_hashes_max); + while (!hashes.empty () && hashes_l.size () < nano::network::confirm_ack_hashes_max) { hashes_l.push_back (hashes.front ()); hashes.pop_front (); @@ -81,16 +81,20 @@ void nano::vote_generator::run () lock.lock (); while (!stopped) { - if (hashes.size () >= 12) + if (hashes.size () >= nano::network::confirm_ack_hashes_max) { send (lock); } else { - condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= 12; }); - if (hashes.size () >= config.vote_generator_threshold && hashes.size () < 12) + // clang-format off + condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= nano::network::confirm_ack_hashes_max; }); + // clang-format on + if (hashes.size () >= config.vote_generator_threshold && hashes.size () < nano::network::confirm_ack_hashes_max) { - condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= 12; }); + // clang-format off + condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= nano::network::confirm_ack_hashes_max; }); + // clang-format on } if (!hashes.empty ()) {