diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 66afee12..dee51457 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -43,6 +43,8 @@ add_library (node common.cpp confirmation_height_processor.hpp confirmation_height_processor.cpp + confirmation_solicitor.cpp + confirmation_solicitor.hpp daemonconfig.hpp daemonconfig.cpp distributed_work.hpp diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index f448bfba..61a4863a 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -18,6 +18,7 @@ election_time_to_live (node.network_params.network.is_test_network () ? 0s : 10s multipliers_cb (20, 1.), trended_active_difficulty (node.network_params.network.publish_threshold), next_frontier_check (steady_clock::now ()), +solicitor (node_a), thread ([this]() { nano::thread_role::set (nano::thread_role::name::request_loop); request_loop (); @@ -202,109 +203,11 @@ void nano::active_transactions::election_escalate (std::shared_ptr & election_l, nano::transaction const & transaction_l, std::deque> & blocks_bundle_l, std::unordered_set & inactive_l, nano::qualified_root & root_l) -{ - if (node.ledger.could_fit (transaction_l, *election_l->status.winner)) - { - // Broadcast current winner - if (blocks_bundle_l.size () < max_block_broadcasts) - { - blocks_bundle_l.push_back (election_l->status.winner); - } - } - else if (election_l->confirmation_request_count != 0) - { - election_l->stop (); - inactive_l.insert (root_l); - } -} - -bool nano::active_transactions::election_request_confirm (std::shared_ptr & election_l, std::vector const & representatives_l, size_t const & roots_size_l, -std::deque, std::shared_ptr>>>> & single_confirm_req_bundle_l, -std::unordered_map, std::deque>> & batched_confirm_req_bundle_l) -{ - bool inserted_into_any_bundle{ false }; - std::vector> rep_channels_missing_vote_l; - // Add all rep endpoints that haven't already voted - for (const auto & rep : representatives_l) - { - if (election_l->last_votes.find (rep.account) == election_l->last_votes.end ()) - { - rep_channels_missing_vote_l.push_back (rep.channel); - - if (node.config.logging.vote_logging () && election_l->confirmation_request_count > 0) - { - node.logger.try_log ("Representative did not respond to confirm_req, retrying: ", rep.account.to_account ()); - } - } - } - // Unique channels as there can be multiple reps per channel - rep_channels_missing_vote_l.erase (std::unique (rep_channels_missing_vote_l.begin (), rep_channels_missing_vote_l.end ()), rep_channels_missing_vote_l.end ()); - bool low_reps_weight (rep_channels_missing_vote_l.empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ()); - if (low_reps_weight && roots_size_l <= 5 && !node.network_params.network.is_test_network ()) - { - // Spam mode - auto deque_l (node.network.udp_channels.random_set (100)); - auto vec (std::make_shared>> ()); - for (auto i : deque_l) - { - vec->push_back (i); - } - single_confirm_req_bundle_l.push_back (std::make_pair (election_l->status.winner, vec)); - inserted_into_any_bundle = true; - } - else - { - auto single_confirm_req_channels_l (std::make_shared>> ()); - for (auto & rep : rep_channels_missing_vote_l) - { - if (rep->get_network_version () >= node.network_params.protocol.tcp_realtime_protocol_version_min) - { - // Send batch request to peers supporting confirm_req by hash + root - auto rep_request_l (batched_confirm_req_bundle_l.find (rep)); - auto block_l (election_l->status.winner); - auto root_hash_l (std::make_pair (block_l->hash (), block_l->root ())); - if (rep_request_l == batched_confirm_req_bundle_l.end ()) - { - // Maximum number of representatives - if (batched_confirm_req_bundle_l.size () < max_confirm_representatives) - { - std::deque> insert_root_hash = { root_hash_l }; - batched_confirm_req_bundle_l.insert (std::make_pair (rep, insert_root_hash)); - inserted_into_any_bundle = true; - } - } - // Maximum number of hashes - else if (rep_request_l->second.size () < max_confirm_req_batches * nano::network::confirm_req_hashes_max) - { - rep_request_l->second.push_back (root_hash_l); - inserted_into_any_bundle = true; - } - } - else - { - single_confirm_req_channels_l->push_back (rep); - } - } - // broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing - if (single_confirm_req_bundle_l.size () < max_confirm_req && !single_confirm_req_channels_l->empty ()) - { - single_confirm_req_bundle_l.push_back (std::make_pair (election_l->status.winner, single_confirm_req_channels_l)); - inserted_into_any_bundle = true; - } - } - return inserted_into_any_bundle; -} - void nano::active_transactions::request_confirm (nano::unique_lock & lock_a) { assert (!mutex.try_lock ()); auto transaction_l (node.store.tx_begin_read ()); std::unordered_set inactive_l; - std::deque> blocks_bundle_l; - std::unordered_map, std::deque>> batched_confirm_req_bundle_l; - std::deque, std::shared_ptr>>>> single_confirm_req_bundle_l; - /* * Confirm frontiers when there aren't many confirmations already pending and node finished initial bootstrap * In auto mode start confirm only if node contains almost principal representative (half of required for principal weight) @@ -330,7 +233,6 @@ void nano::active_transactions::request_confirm (nano::unique_lock & // The lowest PoW difficulty elections have a maximum time to live if they are beyond the soft threshold size for the container auto election_ttl_cutoff_l (std::chrono::steady_clock::now () - election_time_to_live); - auto const representatives_l (node.rep_crawler.representatives (std::numeric_limits::max ())); auto roots_size_l (roots.size ()); auto & sorted_roots_l = roots.get (); size_t count_l{ 0 }; @@ -344,12 +246,17 @@ void nano::active_transactions::request_confirm (nano::unique_lock & * Elections extending the soft config.active_elections_size limit are flushed after a certain time-to-live cutoff * Flushed elections are later re-activated via frontier confirmation */ + solicitor.prepare (); for (auto i = sorted_roots_l.begin (), n = sorted_roots_l.end (); i != n; ++i, ++count_l) { auto election_l (i->election); auto root_l (i->root); + if (election_l->confirmed || (election_l->confirmation_request_count != 0 && !node.ledger.could_fit (transaction_l, *election_l->status.winner))) + { + election_l->stop (); + } // Erase finished elections - if ((election_l->confirmed || election_l->stopped)) + if ((election_l->stopped)) { inactive_l.insert (root_l); } @@ -363,74 +270,16 @@ void nano::active_transactions::request_confirm (nano::unique_lock & // Broadcast and request confirmation else if (election_l->skip_delay || election_l->election_start < cutoff_l) { - bool increment_counter_l{ true }; + solicitor.add (election_l); // Escalate long election after a certain time and number of requests performed if (election_l->confirmation_request_count > 4 && election_l->election_start < long_election_cutoff_l) { election_escalate (election_l, transaction_l, roots_size_l); } - // Block broadcasting - if (election_l->confirmation_request_count % 8 == 1) - { - election_broadcast (election_l, transaction_l, blocks_bundle_l, inactive_l, root_l); - } - // Confirmation requesting - if (election_l->confirmation_request_count % 4 == 0) - { - // If failed to insert into any of the bundles (capped), don't increment the counter so that the same root is sent for confirmation in the next loop - if (!election_request_confirm (election_l, representatives_l, roots_size_l, single_confirm_req_bundle_l, batched_confirm_req_bundle_l)) - { - increment_counter_l = false; - } - } - if (increment_counter_l || node.network_params.network.is_test_network ()) - { - ++election_l->confirmation_request_count; - } } } - ongoing_broadcasts = !blocks_bundle_l.empty () + !batched_confirm_req_bundle_l.empty () + !single_confirm_req_bundle_l.empty (); lock_a.unlock (); - - // Rebroadcast unconfirmed blocks - if (!blocks_bundle_l.empty ()) - { - node.network.flood_block_many ( - std::move (blocks_bundle_l), [this]() { - { - nano::lock_guard guard_l (this->mutex); - --this->ongoing_broadcasts; - } - this->condition.notify_all (); - }, - 10); // 10ms/block * 30blocks = 300ms < 500ms - } - // Batch confirmation request - if (!batched_confirm_req_bundle_l.empty ()) - { - node.network.broadcast_confirm_req_batched_many ( - batched_confirm_req_bundle_l, [this]() { - { - nano::lock_guard guard_l (this->mutex); - --this->ongoing_broadcasts; - } - this->condition.notify_all (); - }, - 15); // 15ms/batch * 20batches = 300ms < 500ms - } - // Single confirmation requests - if (!single_confirm_req_bundle_l.empty ()) - { - node.network.broadcast_confirm_req_many ( - single_confirm_req_bundle_l, [this]() { - { - nano::lock_guard guard_l (this->mutex); - --this->ongoing_broadcasts; - } - this->condition.notify_all (); - }, - 30); // 30~60ms/req * 5 reqs = 150~300ms < 500ms - } + solicitor.flush (); lock_a.lock (); // Erase inactive elections for (auto i (inactive_l.begin ()), n (inactive_l.end ()); i != n; ++i) @@ -467,10 +316,6 @@ void nano::active_transactions::request_loop () request_confirm (lock); // Sleep until all broadcasts are done, plus the remaining loop time - while (!stopped && ongoing_broadcasts) - { - condition.wait (lock); - } if (!stopped) { // clang-format off diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index 826b8b03..b0f7d9b9 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -121,10 +122,7 @@ public: std::chrono::milliseconds const election_request_delay; // Maximum time an election can be kept active if it is extending the container std::chrono::seconds const election_time_to_live; - static size_t constexpr max_block_broadcasts = 30; static size_t constexpr max_confirm_representatives = 30; - static size_t constexpr max_confirm_req_batches = 20; - static size_t constexpr max_confirm_req = 5; boost::circular_buffer multipliers_cb; uint64_t trended_active_difficulty; size_t priority_cementable_frontiers_size (); @@ -145,17 +143,12 @@ private: void request_loop (); void search_frontiers (nano::transaction const &); void election_escalate (std::shared_ptr &, nano::transaction const &, size_t const &); - void election_broadcast (std::shared_ptr &, nano::transaction const &, std::deque> &, std::unordered_set &, nano::qualified_root &); - bool election_request_confirm (std::shared_ptr &, std::vector const &, size_t const &, - std::deque, std::shared_ptr>>>> & single_confirm_req_bundle_l, - std::unordered_map, std::deque>> & batched_confirm_req_bundle_l); void request_confirm (nano::unique_lock &); nano::account next_frontier_account{ 0 }; std::chrono::steady_clock::time_point next_frontier_check{ std::chrono::steady_clock::now () }; nano::condition_variable condition; bool started{ false }; std::atomic stopped{ false }; - unsigned ongoing_broadcasts{ 0 }; // clang-format off boost::multi_index_container, mi::member>>> dropped_elections_cache; + nano::confirmation_solicitor solicitor; // clang-format on static size_t constexpr dropped_elections_cache_max{ 32 * 1024 }; boost::thread thread; diff --git a/nano/node/confirmation_solicitor.cpp b/nano/node/confirmation_solicitor.cpp new file mode 100644 index 00000000..ba20625d --- /dev/null +++ b/nano/node/confirmation_solicitor.cpp @@ -0,0 +1,63 @@ +#include + +#include +#include + +nano::confirmation_solicitor::confirmation_solicitor (nano::node & node_a) : +node (node_a) +{ +} + +void nano::confirmation_solicitor::prepare () +{ + assert (!prepared); + requests.clear (); + rebroadcasted = 0; + representatives = node.rep_crawler.representatives (); + prepared = true; +} + +void nano::confirmation_solicitor::add (std::shared_ptr election_a) +{ + assert (prepared); + if (election_a->confirmation_request_count % 8 == 1 && rebroadcasted++ < max_block_broadcasts) + { + node.network.flood_block (election_a->status.winner); + } + for (auto const & rep : representatives) + { + if (election_a->last_votes.find (rep.account) == election_a->last_votes.end ()) + { + requests.insert ({ rep.channel, election_a }); + } + } + ++election_a->confirmation_request_count; +} + +void nano::confirmation_solicitor::flush () +{ + assert (prepared); + size_t batch_count = 0; + size_t single_count = 0; + for (auto i = requests.begin (), n (requests.end ()); i != n;) + { + if (batch_count++ < max_confirm_req_batches && i->channel->get_network_version () >= node.network_params.protocol.tcp_realtime_protocol_version_min) + { + auto channel = i->channel; + std::vector> roots_hashes_l; + while (i != n && i->channel == channel && roots_hashes_l.size () < nano::network::confirm_req_hashes_max) + { + roots_hashes_l.push_back (std::make_pair (i->election->status.winner->hash (), i->election->status.winner->root ())); + ++i; + } + nano::confirm_req req (roots_hashes_l); + channel->send (req); + } + else if (single_count++ < max_confirm_req) + { + node.network.broadcast_confirm_req (i->election->status.winner); + ++i; + } + } + prepared = false; +} diff --git a/nano/node/confirmation_solicitor.hpp b/nano/node/confirmation_solicitor.hpp new file mode 100644 index 00000000..213a69ff --- /dev/null +++ b/nano/node/confirmation_solicitor.hpp @@ -0,0 +1,51 @@ +#pragma once + +#include + +#include + +namespace nano +{ +class election; +class node; +/** This class accepts elections that need further votes before they can be confirmed and bundles them in to single confirm_req packets */ +class confirmation_solicitor final +{ + class request + { + public: + std::shared_ptr channel; + std::shared_ptr election; + bool operator== (nano::confirmation_solicitor::request const & other_a) const + { + return *channel == *other_a.channel && election == other_a.election; + } + }; + class request_hash + { + public: + size_t operator () (nano::confirmation_solicitor::request const & item_a) const + { + return std::hash> ()(item_a.election) ^ std::hash ()(*item_a.channel); + } + }; +public: + confirmation_solicitor (nano::node &); + /** Prepare object for batching election confirmation requests*/ + void prepare (); + /** Add an election that needs to be confirmed */ + void add (std::shared_ptr); + /** Bundle hashes together for identical channels in to a single confirm_req by hash packet */ + void flush (); +private: + static size_t constexpr max_confirm_req_batches = 20; + static size_t constexpr max_confirm_req = 5; + static size_t constexpr max_block_broadcasts = 30; + int rebroadcasted { 0 }; + nano::node & node; + std::vector representatives; + /** Unique channel/hash to be requested */ + std::unordered_set requests; + bool prepared { false }; +}; +}