From 903391ef05a7668fc91324a208ecb1f621a57633 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Wed, 9 Oct 2019 14:39:51 +0100 Subject: [PATCH] Basic confirmation request loop enhancements (#2327) * Throttle confirmation request loop * Add an assert on frontier confirmation * Revert "Add an assert on frontier confirmation" This reverts commit a37a98f2a34c7be2ad6291f96f2c50c33743aa77. Would fail some tests. * Setting ongoing_broadcasts before unlocking * Fix test active_transactions.bounded_active_elections * Review * Use an unordered_set of rep channels to prevent sending confirm_req repeatedly to the same channel --- nano/core_test/active_transactions.cpp | 16 +-- nano/node/active_transactions.cpp | 178 ++++++++++++++----------- nano/node/active_transactions.hpp | 1 + nano/node/json_handler.cpp | 4 +- nano/node/network.cpp | 60 +++++---- nano/node/network.hpp | 6 +- 6 files changed, 150 insertions(+), 115 deletions(-) diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 13bb14a84..007585312 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -12,25 +12,25 @@ TEST (active_transactions, bounded_active_elections) nano::node_config node_config (24000, system.logging); node_config.enable_voting = false; node_config.active_elections_size = 5; - auto & node1 = *system.add_node (node_config); + auto & node = *system.add_node (node_config); nano::genesis genesis; - size_t count (1); + size_t count (0); auto send (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - count * nano::xrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ()))); - auto previous_size = node1.active.size (); bool done (false); system.deadline_set (5s); while (!done) { + node.process_active (send); + node.active.start (send); + ASSERT_FALSE (node.active.empty ()); + ASSERT_LE (node.active.size (), node.config.active_elections_size); + done = count > node.active.size (); count++; - node1.process_active (send); - done = previous_size > node1.active.size (); - ASSERT_LT (node1.active.size (), node1.config.active_elections_size); //triggers after reverting #2116 ASSERT_NO_ERROR (system.poll ()); auto previous_hash = send->hash (); send = std::make_shared (nano::test_genesis_key.pub, previous_hash, nano::test_genesis_key.pub, nano::genesis_amount - count * nano::xrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (previous_hash)); - previous_size = node1.active.size (); //sleep this thread for the max delay between request loop rounds possible for such a small active_elections_size - std::this_thread::sleep_for (std::chrono::milliseconds (node1.network_params.network.request_interval_ms + (node_config.active_elections_size * 20))); + std::this_thread::sleep_for (std::chrono::milliseconds (node.network_params.network.request_interval_ms + (node_config.active_elections_size * 20))); } } diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index d11f2817b..e77f31f66 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -150,41 +150,46 @@ void nano::active_transactions::post_confirmation_height_set (nano::transaction void nano::active_transactions::request_confirm (nano::unique_lock & lock_a) { - std::unordered_set inactive; - auto transaction (node.store.tx_begin_read ()); - unsigned unconfirmed_count (0); - unsigned unconfirmed_request_count (0); - unsigned could_fit_delay = node.network_params.network.is_test_network () ? high_confirmation_request_count - 1 : 1; - std::unordered_map, std::deque>> requests_bundle; - std::deque> rebroadcast_bundle; - std::deque, std::shared_ptr>>>> confirm_req_bundle; + std::unordered_set inactive_l; + auto transaction_l (node.store.tx_begin_read ()); + unsigned unconfirmed_count_l (0); + unsigned unconfirmed_request_count_l (0); + unsigned could_fit_delay_l = node.network_params.network.is_test_network () ? high_confirmation_request_count - 1 : 1; + std::deque> blocks_bundle_l; + std::unordered_map, std::deque>> batched_confirm_req_bundle_l; + std::deque, std::shared_ptr>>>> single_confirm_req_bundle_l; /* Confirm frontiers when there aren't many confirmations already pending and node finished initial bootstrap - In auto mode start confirm only if node contains almost principal representative (half of required for principal weight) */ + In auto mode start confirm only if node contains almost principal representative (half of required for principal weight) + The confirmation height processor works asynchronously, compressing several roots into one frontier, so probably_unconfirmed_frontiers is not always correct*/ lock_a.unlock (); - if (node.config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled && node.ledger.block_count_cache > node.ledger.cemented_count + roots.size () && node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off && node.ledger.block_count_cache >= node.ledger.bootstrap_weight_max_blocks) + auto pending_confirmation_height_size (node.pending_confirmation_height.size ()); + bool probably_unconfirmed_frontiers (node.ledger.block_count_cache > node.ledger.cemented_count + roots.size () + pending_confirmation_height_size); + bool bootstrap_weight_reached (node.ledger.block_count_cache >= node.ledger.bootstrap_weight_max_blocks); + if (node.config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled && bootstrap_weight_reached && probably_unconfirmed_frontiers && pending_confirmation_height_size < confirmed_frontiers_max_pending_cut_off) { - confirm_frontiers (transaction); + confirm_frontiers (transaction_l); } lock_a.lock (); - auto roots_size (roots.size ()); + auto representatives_l (node.rep_crawler.representatives (std::numeric_limits::max ())); + auto roots_size_l (roots.size ()); for (auto i (roots.get<1> ().begin ()), n (roots.get<1> ().end ()); i != n; ++i) { - auto root (i->root); + auto root_l (i->root); auto election_l (i->election); if ((election_l->confirmed || election_l->stopped) && election_l->confirmation_request_count >= minimum_confirmation_request_count - 1) { if (election_l->stopped) { - inactive.insert (root); + inactive_l.insert (root_l); } } else { if (election_l->confirmation_request_count > high_confirmation_request_count) { - ++unconfirmed_count; - unconfirmed_request_count += election_l->confirmation_request_count; + ++unconfirmed_count_l; + unconfirmed_request_count_l += election_l->confirmation_request_count; // Log votes for very long unconfirmed elections if (election_l->confirmation_request_count % 50 == 1) { @@ -194,49 +199,49 @@ void nano::active_transactions::request_confirm (nano::unique_lock & /* Escalation for long unconfirmed elections Start new elections for previous block & source if there are less than 100 active elections */ - if (election_l->confirmation_request_count % high_confirmation_request_count == 1 && roots_size < 100 && !node.network_params.network.is_test_network ()) + if (election_l->confirmation_request_count % high_confirmation_request_count == 1 && roots_size_l < 100 && !node.network_params.network.is_test_network ()) { - bool escalated (false); - std::shared_ptr previous; - auto previous_hash (election_l->status.winner->previous ()); - if (!previous_hash.is_zero ()) + bool escalated_l (false); + std::shared_ptr previous_l; + auto previous_hash_l (election_l->status.winner->previous ()); + if (!previous_hash_l.is_zero ()) { - previous = node.store.block_get (transaction, previous_hash); - if (previous != nullptr && blocks.find (previous_hash) == blocks.end () && !node.block_confirmed_or_being_confirmed (transaction, previous_hash)) + previous_l = node.store.block_get (transaction_l, previous_hash_l); + if (previous_l != nullptr && blocks.find (previous_hash_l) == blocks.end () && !node.block_confirmed_or_being_confirmed (transaction_l, previous_hash_l)) { - add (std::move (previous)); - escalated = true; + add (std::move (previous_l)); + escalated_l = true; } } /* If previous block not existing/not commited yet, block_source can cause segfault for state blocks So source check can be done only if previous != nullptr or previous is 0 (open account) */ - if (previous_hash.is_zero () || previous != nullptr) + if (previous_hash_l.is_zero () || previous_l != nullptr) { - auto source_hash (node.ledger.block_source (transaction, *election_l->status.winner)); - if (!source_hash.is_zero () && source_hash != previous_hash && blocks.find (source_hash) == blocks.end ()) + auto source_hash (node.ledger.block_source (transaction_l, *election_l->status.winner)); + if (!source_hash.is_zero () && source_hash != previous_hash_l && blocks.find (source_hash) == blocks.end ()) { - auto source (node.store.block_get (transaction, source_hash)); - if (source != nullptr && !node.block_confirmed_or_being_confirmed (transaction, source_hash)) + auto source (node.store.block_get (transaction_l, source_hash)); + if (source != nullptr && !node.block_confirmed_or_being_confirmed (transaction_l, source_hash)) { add (std::move (source)); - escalated = true; + escalated_l = true; } } } - if (escalated) + if (escalated_l) { election_l->update_dependent (); } } } - if (election_l->confirmation_request_count < high_confirmation_request_count || election_l->confirmation_request_count % high_confirmation_request_count == could_fit_delay) + if (election_l->confirmation_request_count < high_confirmation_request_count || election_l->confirmation_request_count % high_confirmation_request_count == could_fit_delay_l) { - if (node.ledger.could_fit (transaction, *election_l->status.winner)) + if (node.ledger.could_fit (transaction_l, *election_l->status.winner)) { // Broadcast winner - if (rebroadcast_bundle.size () < max_broadcast_queue) + if (blocks_bundle_l.size () < max_broadcast_queue) { - rebroadcast_bundle.push_back (election_l->status.winner); + blocks_bundle_l.push_back (election_l->status.winner); } } else @@ -244,21 +249,17 @@ void nano::active_transactions::request_confirm (nano::unique_lock & if (election_l->confirmation_request_count != 0) { election_l->stop (); - inactive.insert (root); + inactive_l.insert (root_l); } } } - auto rep_channels (std::make_shared>> ()); - auto reps (node.rep_crawler.representatives (std::numeric_limits::max ())); - - // Add all rep endpoints that haven't already voted. We use a set since multiple - // reps may exist on an endpoint. - std::unordered_set> channels; - for (auto & rep : reps) + std::unordered_set> rep_channels_missing_vote_l; + // Add all rep endpoints that haven't already voted + for (auto & rep : representatives_l) { if (election_l->last_votes.find (rep.account) == election_l->last_votes.end ()) { - channels.insert (rep.channel); + rep_channels_missing_vote_l.insert (rep.channel); if (node.config.logging.vote_logging ()) { @@ -266,11 +267,8 @@ void nano::active_transactions::request_confirm (nano::unique_lock & } } } - - rep_channels->insert (rep_channels->end (), channels.begin (), channels.end ()); - - bool low_reps_weight (rep_channels->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ()); - if (low_reps_weight && roots_size <= 5 && !node.network_params.network.is_test_network ()) + bool low_reps_weight (rep_channels_missing_vote_l.empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ()); + if (low_reps_weight && roots_size_l <= 5 && !node.network_params.network.is_test_network ()) { // Spam mode auto deque_l (node.network.udp_channels.random_set (100)); @@ -279,25 +277,25 @@ void nano::active_transactions::request_confirm (nano::unique_lock & { vec->push_back (i); } - confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, vec)); + single_confirm_req_bundle_l.push_back (std::make_pair (election_l->status.winner, vec)); } else { auto single_confirm_req_channels (std::make_shared>> ()); - for (auto & rep : *rep_channels) + for (auto & rep : rep_channels_missing_vote_l) { if (rep->get_network_version () >= node.network_params.protocol.tcp_realtime_protocol_version_min) { // Send batch request to peers supporting confirm_req by hash + root - auto rep_request (requests_bundle.find (rep)); + auto rep_request (batched_confirm_req_bundle_l.find (rep)); auto block (election_l->status.winner); auto root_hash (std::make_pair (block->hash (), block->root ())); - if (rep_request == requests_bundle.end ()) + if (rep_request == batched_confirm_req_bundle_l.end ()) { - if (requests_bundle.size () < max_broadcast_queue) + if (batched_confirm_req_bundle_l.size () < max_broadcast_queue) { std::deque> insert_root_hash = { root_hash }; - requests_bundle.insert (std::make_pair (rep, insert_root_hash)); + batched_confirm_req_bundle_l.insert (std::make_pair (rep, insert_root_hash)); } } else if (rep_request->second.size () < max_broadcast_queue * nano::network::confirm_req_hashes_max) @@ -311,33 +309,53 @@ void nano::active_transactions::request_confirm (nano::unique_lock & } } // broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing - if (confirm_req_bundle.size () < max_broadcast_queue && !single_confirm_req_channels->empty ()) + if (single_confirm_req_bundle_l.size () < max_broadcast_queue && !single_confirm_req_channels->empty ()) { - confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, single_confirm_req_channels)); + single_confirm_req_bundle_l.push_back (std::make_pair (election_l->status.winner, single_confirm_req_channels)); } } } ++election_l->confirmation_request_count; } + ongoing_broadcasts = !blocks_bundle_l.empty () + !batched_confirm_req_bundle_l.empty () + !single_confirm_req_bundle_l.empty (); lock_a.unlock (); // Rebroadcast unconfirmed blocks - if (!rebroadcast_bundle.empty ()) + if (!blocks_bundle_l.empty ()) { - node.network.flood_block_batch (std::move (rebroadcast_bundle)); + node.network.flood_block_many (std::move (blocks_bundle_l), [this]() { + { + nano::lock_guard guard_l (this->mutex); + --this->ongoing_broadcasts; + } + this->condition.notify_all (); + }); } - // Batch confirmation request - if (!requests_bundle.empty ()) + // Batched confirmation requests + if (!batched_confirm_req_bundle_l.empty ()) { - node.network.broadcast_confirm_req_batch (requests_bundle, 50); + node.network.broadcast_confirm_req_batched_many (batched_confirm_req_bundle_l, [this]() { + { + nano::lock_guard guard_l (this->mutex); + --this->ongoing_broadcasts; + } + this->condition.notify_all (); + }, + 50); } - //confirm_req broadcast - if (!confirm_req_bundle.empty ()) + // Single confirmation requests + if (!single_confirm_req_bundle_l.empty ()) { - node.network.broadcast_confirm_req_batch (confirm_req_bundle); + node.network.broadcast_confirm_req_many (single_confirm_req_bundle_l, [this]() { + { + nano::lock_guard guard_l (this->mutex); + --this->ongoing_broadcasts; + } + this->condition.notify_all (); + }); } lock_a.lock (); // Erase inactive elections - for (auto i (inactive.begin ()), n (inactive.end ()); i != n; ++i) + for (auto i (inactive_l.begin ()), n (inactive_l.end ()); i != n; ++i) { auto root_it (roots.find (*i)); if (root_it != roots.end ()) @@ -347,10 +365,10 @@ void nano::active_transactions::request_confirm (nano::unique_lock & roots.erase (root_it); } } - long_unconfirmed_size = unconfirmed_count; - if (unconfirmed_count > 0) + long_unconfirmed_size = unconfirmed_count_l; + if (unconfirmed_count_l > 0) { - node.logger.try_log (boost::str (boost::format ("%1% blocks have been unconfirmed averaging %2% confirmation requests") % unconfirmed_count % (unconfirmed_request_count / unconfirmed_count))); + node.logger.try_log (boost::str (boost::format ("%1% blocks have been unconfirmed averaging %2% confirmation requests") % unconfirmed_count_l % (unconfirmed_request_count_l / unconfirmed_count_l))); } } @@ -372,16 +390,20 @@ void nano::active_transactions::request_loop () request_confirm (lock); update_active_difficulty (lock); - // This prevents unnecessary waiting if stopped is set in-between the above check and now - if (stopped) + const auto extra_delay_l (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2); + const auto wakeup_l (std::chrono::steady_clock::now () + std::chrono::milliseconds (node.network_params.network.request_interval_ms + extra_delay_l)); + + // Sleep until all broadcasts are done, plus the remaining loop time + while (!stopped && ongoing_broadcasts) { - break; + condition.wait (lock); + } + if (!stopped) + { + // clang-format off + condition.wait_until (lock, wakeup_l, [&wakeup_l, &stopped = stopped] { return stopped || std::chrono::steady_clock::now () >= wakeup_l; }); + // clang-format on } - const auto extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2); - const auto wakeup (std::chrono::steady_clock::now () + std::chrono::milliseconds (node.network_params.network.request_interval_ms + extra_delay)); - // clang-format off - condition.wait_until (lock, wakeup, [&wakeup, &stopped = stopped] { return stopped || std::chrono::steady_clock::now () >= wakeup; }); - // clang-format on } } diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index 23869a66b..826db7a79 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -154,6 +154,7 @@ private: nano::condition_variable condition; bool started{ false }; std::atomic stopped{ false }; + unsigned ongoing_broadcasts{ 0 }; boost::multi_index_container< nano::confirmed_set_info, boost::multi_index::indexed_by< diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 49e99f346..5f9bb22d5 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3388,7 +3388,7 @@ void nano::json_handler::republish () } hash = node.store.block_successor (transaction, hash); } - node.network.flood_block_batch (std::move (republish_bundle), 25); + node.network.flood_block_many (std::move (republish_bundle), nullptr, 25); response_l.put ("success", ""); // obsolete response_l.add_child ("blocks", blocks); } @@ -4465,7 +4465,7 @@ void nano::json_handler::wallet_republish () blocks.push_back (std::make_pair ("", entry)); } } - node.network.flood_block_batch (std::move (republish_bundle), 25); + node.network.flood_block_many (std::move (republish_bundle), nullptr, 25); response_l.add_child ("blocks", blocks); } response_errors (); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 9ee8a3c37..9bd08e88a 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -224,23 +224,27 @@ void nano::network::flood_message (nano::message const & message_a, bool const i } } -void nano::network::flood_block_batch (std::deque> blocks_a, unsigned delay_a) +void nano::network::flood_block_many (std::deque> blocks_a, std::function callback_a, unsigned delay_a) { - auto block (blocks_a.front ()); + auto block_l (blocks_a.front ()); blocks_a.pop_front (); - flood_block (block); + flood_block (block_l); if (!blocks_a.empty ()) { std::weak_ptr node_w (node.shared ()); // clang-format off - node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), delay_a]() { + node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a]() { if (auto node_l = node_w.lock ()) { - node_l->network.flood_block_batch (std::move (blocks), delay_a); + node_l->network.flood_block_many (std::move (blocks), callback_a, delay_a); } }); // clang-format on } + else if (callback_a) + { + callback_a (); + } } void nano::network::send_confirm_req (std::shared_ptr channel_a, std::shared_ptr block_a) @@ -319,29 +323,29 @@ void nano::network::broadcast_confirm_req_base (std::shared_ptr blo } } -void nano::network::broadcast_confirm_req_batch (std::unordered_map, std::deque>> request_bundle_a, unsigned delay_a, bool resumption) +void nano::network::broadcast_confirm_req_batched_many (std::unordered_map, std::deque>> request_bundle_a, std::function callback_a, unsigned delay_a, bool resumption) { const size_t max_reps = 50; if (!resumption && node.config.logging.network_logging ()) { node.logger.try_log (boost::str (boost::format ("Broadcasting batch confirm req to %1% representatives") % request_bundle_a.size ())); } - auto count (0); - while (!request_bundle_a.empty () && count < max_reps) + auto count_l (0); + while (!request_bundle_a.empty () && count_l < max_reps) { auto j (request_bundle_a.begin ()); while (j != request_bundle_a.end ()) { - count++; - std::vector> roots_hashes; + ++count_l; + std::vector> roots_hashes_l; // Limit max request size hash + root to 7 pairs - while (roots_hashes.size () < confirm_req_hashes_max && !j->second.empty ()) + while (roots_hashes_l.size () < confirm_req_hashes_max && !j->second.empty ()) { // expects ordering by priority, descending - roots_hashes.push_back (j->second.front ()); + roots_hashes_l.push_back (j->second.front ()); j->second.pop_front (); } - nano::confirm_req req (roots_hashes); + nano::confirm_req req (roots_hashes_l); j->first->send (req); if (j->second.empty ()) { @@ -356,38 +360,46 @@ void nano::network::broadcast_confirm_req_batch (std::unordered_map node_w (node.shared ()); - node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, request_bundle_a, delay_a]() { + node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, request_bundle_a, callback_a, delay_a]() { if (auto node_l = node_w.lock ()) { - node_l->network.broadcast_confirm_req_batch (request_bundle_a, delay_a, true); + node_l->network.broadcast_confirm_req_batched_many (request_bundle_a, callback_a, delay_a, true); } }); } + else if (callback_a) + { + callback_a (); + } } -void nano::network::broadcast_confirm_req_batch (std::deque, std::shared_ptr>>>> deque_a, unsigned delay_a) +void nano::network::broadcast_confirm_req_many (std::deque, std::shared_ptr>>>> requests_a, std::function callback_a, unsigned delay_a) { - auto pair (deque_a.front ()); - deque_a.pop_front (); - auto block (pair.first); + auto pair_l (requests_a.front ()); + requests_a.pop_front (); + auto block_l (pair_l.first); // confirm_req to representatives - auto endpoints (pair.second); + auto endpoints (pair_l.second); if (!endpoints->empty ()) { - broadcast_confirm_req_base (block, endpoints, delay_a); + broadcast_confirm_req_base (block_l, endpoints, delay_a); } /* Continue while blocks remain Broadcast with random delay between delay_a & 2*delay_a */ - if (!deque_a.empty ()) + if (!requests_a.empty ()) { std::weak_ptr node_w (node.shared ()); - node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, deque_a, delay_a]() { + node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, requests_a, callback_a, delay_a]() { if (auto node_l = node_w.lock ()) { - node_l->network.broadcast_confirm_req_batch (deque_a, delay_a); + node_l->network.broadcast_confirm_req_many (requests_a, callback_a, delay_a); } }); } + else if (callback_a) + { + callback_a (); + } } namespace diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 857332785..6f4d08adc 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -116,7 +116,7 @@ public: flood_message (publish, is_droppable_a); } - void flood_block_batch (std::deque>, unsigned = broadcast_interval_ms); + void flood_block_many (std::deque>, std::function = nullptr, unsigned = broadcast_interval_ms); void merge_peers (std::array const &); void merge_peer (nano::endpoint const &); void send_keepalive (std::shared_ptr); @@ -125,8 +125,8 @@ public: void send_confirm_req (std::shared_ptr, std::shared_ptr); void broadcast_confirm_req (std::shared_ptr); void broadcast_confirm_req_base (std::shared_ptr, std::shared_ptr>>, unsigned, bool = false); - void broadcast_confirm_req_batch (std::unordered_map, std::deque>>, unsigned = broadcast_interval_ms, bool = false); - void broadcast_confirm_req_batch (std::deque, std::shared_ptr>>>>, unsigned = broadcast_interval_ms); + void broadcast_confirm_req_batched_many (std::unordered_map, std::deque>>, std::function = nullptr, unsigned = broadcast_interval_ms, bool = false); + void broadcast_confirm_req_many (std::deque, std::shared_ptr>>>>, std::function = nullptr, unsigned = broadcast_interval_ms); void confirm_hashes (nano::transaction const &, std::shared_ptr, std::vector); bool send_votes_cache (std::shared_ptr, nano::block_hash const &); std::shared_ptr find_node_id (nano::account const &);