From 0bc041e051df953b8366940ea2ee222f0b97384e Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Wed, 15 Jan 2020 15:05:23 +0000 Subject: [PATCH] Confirmation solicitor revamp (#2472) * Optional minimum version when querying representatives from crawler * Revamping confirmation_solicitor to mimick previous active_transactions behavior * Use a time-based approach to throttle confirmation requests and block flooding * Addressing Wesley review * Remove unusued node.hpp include (thanks wes) * Simplify logic by using unordered_map::operator[] which calls the default constructor if not found * Split solicitor add into broadcast+add and bring back the logic to active_transactions This brings back rate-limitting logic and modifying election variables to active_transactions only. Timings are also slightly adjusted: - Only 2 requests required before starting to flood blocks - Timings for test network * Rename flag * Only broadcast OR request confirmation in the same loop for the same election * Enclose lambda in clang-format off --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/confirmation_solicitor.cpp | 60 +++++++++++++++++++++ nano/lib/config.hpp | 2 +- nano/node/active_transactions.cpp | 55 +++++++++++++------ nano/node/active_transactions.hpp | 22 +++++--- nano/node/confirmation_solicitor.cpp | 66 ++++++++++++++--------- nano/node/confirmation_solicitor.hpp | 46 ++++++---------- nano/node/election.hpp | 2 + nano/node/nodeconfig.hpp | 1 + nano/node/repcrawler.cpp | 5 +- nano/node/repcrawler.hpp | 5 +- 11 files changed, 181 insertions(+), 84 deletions(-) create mode 100644 nano/core_test/confirmation_solicitor.cpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 2482b24c..77ea58ee 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -6,6 +6,7 @@ add_executable (core_test block_store.cpp bootstrap.cpp confirmation_height.cpp + confirmation_solicitor.cpp conflicts.cpp difficulty.cpp distributed_work.cpp diff --git a/nano/core_test/confirmation_solicitor.cpp b/nano/core_test/confirmation_solicitor.cpp new file mode 100644 index 00000000..4d83b7b2 --- /dev/null +++ b/nano/core_test/confirmation_solicitor.cpp @@ -0,0 +1,60 @@ +#include +#include +#include + +#include + +using namespace std::chrono_literals; + +TEST (confirmation_solicitor, batches) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.enable_voting = false; + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto & node1 = *system.add_node (node_config); + auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); + // Solicitor will only solicit from this representative + nano::representative representative (nano::test_genesis_key.pub, nano::genesis_amount, channel1); + node_config.peering_port = nano::get_available_port (); + nano::node_flags node_flags; + // To prevent races on the solicitor + node_flags.disable_request_loop = true; + auto & node2 = *system.add_node (node_config, node_flags); + // Lock active_transactions which uses the solicitor + { + nano::lock_guard active_guard (node2.active.mutex); + std::vector representatives{ representative }; + node2.active.solicitor.prepare (representatives); + // Ensure the representatives are correct + ASSERT_EQ (1, representatives.size ()); + ASSERT_EQ (channel1, representatives.front ().channel); + ASSERT_EQ (nano::test_genesis_key.pub, representatives.front ().account); + nano::genesis genesis; + auto send (std::make_shared (genesis.open->hash (), nano::keypair ().pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.open->hash ()))); + for (size_t i (0); i < nano::network::confirm_req_hashes_max; ++i) + { + auto election (std::make_shared (node2, send, false, nullptr)); + ASSERT_FALSE (node2.active.solicitor.add (*election)); + } + ASSERT_EQ (1, node2.active.solicitor.max_confirm_req_batches); + // Reached the maximum amount of requests for the channel + auto election (std::make_shared (node2, send, false, nullptr)); + ASSERT_TRUE (node2.active.solicitor.add (*election)); + // Broadcasting should be immediate + ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out)); + ASSERT_FALSE (node2.active.solicitor.broadcast (*election)); + while (node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out) < 1) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + // From rep crawler + ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::out)); + system.deadline_set (5s); + node2.active.solicitor.flush (); + while (node2.stats.count (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::out) < 2) + { + ASSERT_NO_ERROR (system.poll ()); + } +} diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index 34ea11ef..59c7dedd 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -73,7 +73,7 @@ public: default_rpc_port = is_live_network () ? 7076 : is_beta_network () ? 55000 : 45000; default_ipc_port = is_live_network () ? 7077 : is_beta_network () ? 56000 : 46000; default_websocket_port = is_live_network () ? 7078 : is_beta_network () ? 57000 : 47000; - request_interval_ms = is_test_network () ? (is_sanitizer_build ? 100 : 20) : 500; + request_interval_ms = is_test_network () ? 20 : 500; } /** Network work thresholds. ~5 seconds of work for the live network */ diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index ffcded85..dc0752a5 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -12,18 +12,25 @@ using namespace std::chrono; nano::active_transactions::active_transactions (nano::node & node_a) : node (node_a), -long_election_threshold (node.network_params.network.is_test_network () ? 2s : 24s), -election_request_delay (node.network_params.network.is_test_network () ? 0s : 1s), -election_time_to_live (node.network_params.network.is_test_network () ? 0s : 10s), multipliers_cb (20, 1.), -trended_active_difficulty (node.network_params.network.publish_threshold), +trended_active_difficulty (node_a.network_params.network.publish_threshold), next_frontier_check (steady_clock::now ()), -solicitor (node_a), +solicitor (node_a.network, node_a.network_params.network), +long_election_threshold (node_a.network_params.network.is_test_network () ? 2s : 24s), +election_request_delay (node_a.network_params.network.is_test_network () ? 0s : 1s), +election_time_to_live (node_a.network_params.network.is_test_network () ? 0s : 10s), +min_time_between_requests (node_a.network_params.network.is_test_network () ? 25ms : 3s), +min_time_between_floods (node_a.network_params.network.is_test_network () ? 50ms : 6s), +min_request_count_flood (node_a.network_params.network.is_test_network () ? 0 : 2), +// clang-format off thread ([this]() { nano::thread_role::set (nano::thread_role::name::request_loop); request_loop (); }) +// clang-format on { + assert (min_time_between_requests > std::chrono::milliseconds (node.network_params.network.request_interval_ms)); + assert (min_time_between_floods > std::chrono::milliseconds (node.network_params.network.request_interval_ms)); nano::unique_lock lock (mutex); condition.wait (lock, [& started = started] { return started; }); } @@ -156,7 +163,7 @@ void nano::active_transactions::post_confirmation_height_set (nano::transaction void nano::active_transactions::election_escalate (std::shared_ptr & election_l, nano::transaction const & transaction_l, size_t const & roots_size_l) { - static unsigned constexpr high_confirmation_request_count{ 128 }; + constexpr unsigned high_confirmation_request_count{ 128 }; // Log votes for very long unconfirmed elections if (election_l->confirmation_request_count % (4 * high_confirmation_request_count) == 1) { @@ -225,28 +232,32 @@ void nano::active_transactions::request_confirm (nano::unique_lock & lock_a.lock (); } } - + auto const now (std::chrono::steady_clock::now ()); // Any new election started from process_live only gets requests after at least 1 second - auto cutoff_l (std::chrono::steady_clock::now () - election_request_delay); + auto cutoff_l (now - election_request_delay); // Elections taking too long get escalated - auto long_election_cutoff_l (std::chrono::steady_clock::now () - long_election_threshold); + auto long_election_cutoff_l (now - long_election_threshold); // The lowest PoW difficulty elections have a maximum time to live if they are beyond the soft threshold size for the container - auto election_ttl_cutoff_l (std::chrono::steady_clock::now () - election_time_to_live); + auto election_ttl_cutoff_l (now - election_time_to_live); + // Rate-limitting floods + auto const flood_cutoff (now - min_time_between_floods); + // Rate-limitting confirmation requests + auto const request_cutoff (now - min_time_between_requests); auto roots_size_l (roots.size ()); auto & sorted_roots_l = roots.get (); size_t count_l{ 0 }; + // Only representatives ready to receive batched confirm_req + solicitor.prepare (node.rep_crawler.representatives (node.network_params.protocol.tcp_realtime_protocol_version_min)); + /* * Loop through active elections in descending order of proof-of-work difficulty, requesting confirmation * * Only up to a certain amount of elections are queued for confirmation request and block rebroadcasting. The remaining elections can still be confirmed if votes arrive - * We avoid selecting the same elections repeatedly in the next loops, through a modulo on confirmation_request_count - * An election only gets confirmation_request_count increased after the first confirm_req; after that it is increased every loop unless they don't fit in the queues * Elections extending the soft config.active_elections_size limit are flushed after a certain time-to-live cutoff * Flushed elections are later re-activated via frontier confirmation */ - solicitor.prepare (); for (auto i = sorted_roots_l.begin (), n = sorted_roots_l.end (); i != n; ++i, ++count_l) { auto election_l (i->election); @@ -267,10 +278,20 @@ void nano::active_transactions::request_confirm (nano::unique_lock & inactive_l.insert (root_l); add_dropped_elections_cache (root_l); } - // Broadcast and request confirmation + // Attempt obtaining votes else if (election_l->skip_delay || election_l->election_start < cutoff_l) { - solicitor.add (election_l); + // Broadcast the winner when elections are taking longer to confirm + if (election_l->confirmation_request_count >= min_request_count_flood && election_l->last_broadcast < flood_cutoff && !solicitor.broadcast (*election_l)) + { + election_l->last_broadcast = now; + } + // Rate-limited requests for confirmation + else if (election_l->last_request < request_cutoff && !solicitor.add (*election_l)) + { + ++election_l->confirmation_request_count; + election_l->last_request = now; + } // Escalate long election after a certain time and number of requests performed if (election_l->confirmation_request_count > 4 && election_l->election_start < long_election_cutoff_l) { @@ -307,7 +328,7 @@ void nano::active_transactions::request_loop () lock.lock (); - while (!stopped) + while (!stopped && !node.flags.disable_request_loop) { // Account for the time spent in request_confirm by defining the wakeup point beforehand const auto wakeup_l (std::chrono::steady_clock::now () + std::chrono::milliseconds (node.network_params.network.request_interval_ms)); @@ -624,7 +645,7 @@ void nano::active_transactions::update_difficulty (std::shared_ptr else if (opt_transaction_a.is_initialized ()) { // Only guaranteed to immediately restart the election if the new block is received within 60s of dropping it - static constexpr std::chrono::seconds recently_dropped_cutoff{ 60s }; + constexpr std::chrono::seconds recently_dropped_cutoff{ 60s }; if (find_dropped_elections_cache (block_a->qualified_root ()) > std::chrono::steady_clock::now () - recently_dropped_cutoff) { lock.unlock (); diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index 2b82cb09..1b390d77 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -116,12 +116,6 @@ public: void erase_inactive_votes_cache (nano::block_hash const &); nano::node & node; std::mutex mutex; - std::chrono::seconds const long_election_threshold; - // Delay until requesting confirmation for an election - std::chrono::milliseconds const election_request_delay; - // Maximum time an election can be kept active if it is extending the container - std::chrono::seconds const election_time_to_live; - static size_t constexpr max_confirm_representatives = 30; boost::circular_buffer multipliers_cb; uint64_t trended_active_difficulty; size_t priority_cementable_frontiers_size (); @@ -133,6 +127,7 @@ public: void add_dropped_elections_cache (nano::qualified_root const &); std::chrono::steady_clock::time_point find_dropped_elections_cache (nano::qualified_root const &); size_t dropped_elections_cache_size (); + nano::confirmation_solicitor solicitor; private: // Call action with confirmed block, may be different than what we started with @@ -148,6 +143,20 @@ private: nano::condition_variable condition; bool started{ false }; std::atomic stopped{ false }; + + // Minimum time an election must be active before escalation + std::chrono::seconds const long_election_threshold; + // Delay until requesting confirmation for an election + std::chrono::milliseconds const election_request_delay; + // Maximum time an election can be kept active if it is extending the container + std::chrono::seconds const election_time_to_live; + // Minimum time between confirmation requests for an election + std::chrono::milliseconds const min_time_between_requests; + // Minimum time between broadcasts of the current winner of an election, as a backup to requesting confirmations + std::chrono::milliseconds const min_time_between_floods; + // Minimum election request count to start broadcasting blocks, as a backup to requesting confirmations + size_t const min_request_count_flood; + // clang-format off boost::multi_index_container, mi::member>>> dropped_elections_cache; - nano::confirmation_solicitor solicitor; // clang-format on static size_t constexpr dropped_elections_cache_max{ 32 * 1024 }; boost::thread thread; diff --git a/nano/node/confirmation_solicitor.cpp b/nano/node/confirmation_solicitor.cpp index df52fb1b..ae1baf5b 100644 --- a/nano/node/confirmation_solicitor.cpp +++ b/nano/node/confirmation_solicitor.cpp @@ -1,62 +1,78 @@ #include #include -#include -nano::confirmation_solicitor::confirmation_solicitor (nano::node & node_a) : -node (node_a) +using namespace std::chrono_literals; + +nano::confirmation_solicitor::confirmation_solicitor (nano::network & network_a, nano::network_constants const & params_a) : +max_confirm_req_batches (params_a.is_test_network () ? 1 : 20), +max_block_broadcasts (params_a.is_test_network () ? 4 : 30), +network (network_a) { } -void nano::confirmation_solicitor::prepare () +void nano::confirmation_solicitor::prepare (std::vector const & representatives_a) { assert (!prepared); requests.clear (); rebroadcasted = 0; - representatives = node.rep_crawler.representatives (); + representatives = representatives_a; prepared = true; } -void nano::confirmation_solicitor::add (std::shared_ptr election_a) +bool nano::confirmation_solicitor::broadcast (nano::election const & election_a) { assert (prepared); - if (election_a->confirmation_request_count % 8 == 1 && rebroadcasted++ < max_block_broadcasts) + bool result (true); + if (rebroadcasted++ < max_block_broadcasts) { - node.network.flood_block (election_a->status.winner); + network.flood_block (election_a.status.winner); + result = false; } + return result; +} + +bool nano::confirmation_solicitor::add (nano::election const & election_a) +{ + assert (prepared); + auto const max_channel_requests (max_confirm_req_batches * nano::network::confirm_req_hashes_max); + bool result = true; for (auto const & rep : representatives) { - if (election_a->last_votes.find (rep.account) == election_a->last_votes.end ()) + if (election_a.last_votes.find (rep.account) == election_a.last_votes.end ()) { - requests.insert ({ rep.channel, election_a }); + auto & request_queue (requests[rep.channel]); + if (request_queue.size () < max_channel_requests) + { + request_queue.emplace_back (election_a.status.winner->hash (), election_a.status.winner->root ()); + result = false; + } } } - ++election_a->confirmation_request_count; + return result; } void nano::confirmation_solicitor::flush () { assert (prepared); - size_t batch_count = 0; - size_t single_count = 0; - for (auto i = requests.begin (), n (requests.end ()); i != n;) + for (auto const & request_queue : requests) { - if (batch_count++ < max_confirm_req_batches && i->channel->get_network_version () >= node.network_params.protocol.tcp_realtime_protocol_version_min) + auto const & channel (request_queue.first); + std::vector> roots_hashes_l; + for (auto const & root_hash : request_queue.second) { - auto channel = i->channel; - std::vector> roots_hashes_l; - while (i != n && i->channel == channel && roots_hashes_l.size () < nano::network::confirm_req_hashes_max) + roots_hashes_l.push_back (root_hash); + if (roots_hashes_l.size () == nano::network::confirm_req_hashes_max) { - roots_hashes_l.push_back (std::make_pair (i->election->status.winner->hash (), i->election->status.winner->root ())); - ++i; + nano::confirm_req req (roots_hashes_l); + channel->send (req); + roots_hashes_l.clear (); } + } + if (!roots_hashes_l.empty ()) + { nano::confirm_req req (roots_hashes_l); channel->send (req); } - else if (single_count++ < max_confirm_req) - { - node.network.broadcast_confirm_req (i->election->status.winner); - ++i; - } } prepared = false; } diff --git a/nano/node/confirmation_solicitor.hpp b/nano/node/confirmation_solicitor.hpp index 6bf606ad..ae5c9a27 100644 --- a/nano/node/confirmation_solicitor.hpp +++ b/nano/node/confirmation_solicitor.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -11,43 +12,28 @@ class node; /** This class accepts elections that need further votes before they can be confirmed and bundles them in to single confirm_req packets */ class confirmation_solicitor final { - class request - { - public: - std::shared_ptr channel; - std::shared_ptr election; - bool operator== (nano::confirmation_solicitor::request const & other_a) const - { - return *channel == *other_a.channel && election == other_a.election; - } - }; - class request_hash - { - public: - size_t operator() (nano::confirmation_solicitor::request const & item_a) const - { - return std::hash> () (item_a.election) ^ std::hash () (*item_a.channel); - } - }; - public: - confirmation_solicitor (nano::node &); + confirmation_solicitor (nano::network &, nano::network_constants const &); /** Prepare object for batching election confirmation requests*/ - void prepare (); - /** Add an election that needs to be confirmed */ - void add (std::shared_ptr); - /** Bundle hashes together for identical channels in to a single confirm_req by hash packet */ + void prepare (std::vector const &); + /** Broadcast the winner of an election if the broadcast limit has not been reached. Returns false if the broadcast was performed */ + bool broadcast (nano::election const &); + /** Add an election that needs to be confirmed. Returns false if successfully added */ + bool add (nano::election const &); + /** Dispatch bundled requests to each channel*/ void flush (); + /** The maximum amount of confirmation requests (batches) to be sent to each channel */ + size_t const max_confirm_req_batches; + /** The global maximum amount of block broadcasts */ + size_t const max_block_broadcasts; private: - static size_t constexpr max_confirm_req_batches = 20; - static size_t constexpr max_confirm_req = 5; - static size_t constexpr max_block_broadcasts = 30; + nano::network & network; + int rebroadcasted{ 0 }; - nano::node & node; std::vector representatives; - /** Unique channel/hash to be requested */ - std::unordered_set requests; + using vector_root_hashes = std::vector>; + std::unordered_map, vector_root_hashes> requests; bool prepared{ false }; }; } diff --git a/nano/node/election.hpp b/nano/node/election.hpp index 8aec3179..3c98550e 100644 --- a/nano/node/election.hpp +++ b/nano/node/election.hpp @@ -79,6 +79,8 @@ public: bool stopped; std::unordered_map last_tally; unsigned confirmation_request_count{ 0 }; + std::chrono::steady_clock::time_point last_broadcast; + std::chrono::steady_clock::time_point last_request; std::unordered_set dependent_blocks; std::chrono::seconds late_blocks_delay{ 5 }; }; diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 7b918a6d..03e4d441 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -118,6 +118,7 @@ public: bool disable_bootstrap_bulk_pull_server{ false }; bool disable_bootstrap_bulk_push_client{ false }; bool disable_rep_crawler{ false }; + bool disable_request_loop{ false }; bool disable_tcp_realtime{ false }; bool disable_udp{ false }; bool disable_unchecked_cleanup{ false }; diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index da1a2285..7ad42757 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -245,13 +245,14 @@ void nano::rep_crawler::update_weights () } } -std::vector nano::rep_crawler::representatives (size_t count_a) +std::vector nano::rep_crawler::representatives (size_t count_a, boost::optional const & opt_version_min_a) { + auto version_min (opt_version_min_a.value_or (node.network_params.protocol.protocol_version_min)); std::vector result; nano::lock_guard lock (probable_reps_mutex); for (auto i (probable_reps.get ().begin ()), n (probable_reps.get ().end ()); i != n && result.size () < count_a; ++i) { - if (!i->weight.is_zero ()) + if (!i->weight.is_zero () && i->channel->get_network_version () >= version_min) { result.push_back (*i); } diff --git a/nano/node/repcrawler.hpp b/nano/node/repcrawler.hpp index ab90633f..f1c2fd1c 100644 --- a/nano/node/repcrawler.hpp +++ b/nano/node/repcrawler.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -103,8 +104,8 @@ public: /** Get total available weight from representatives */ nano::uint128_t total_weight () const; - /** Request a list of the top \p count_a known representatives in descending order of weight. */ - std::vector representatives (size_t count_a = std::numeric_limits::max ()); + /** Request a list of the top \p count_a known representatives in descending order of weight, optionally with a minimum version \p opt_version_min_a */ + std::vector representatives (size_t count_a = std::numeric_limits::max (), boost::optional const & opt_version_min_a = boost::none); /** Request a list of the top \p count_a known representative endpoints. */ std::vector> representative_endpoints (size_t count_a);