Basic confirmation request loop enhancements (#2327)

* Throttle confirmation request loop

* Add an assert on frontier confirmation

* Revert "Add an assert on frontier confirmation"

This reverts commit a37a98f2a34c7be2ad6291f96f2c50c33743aa77.

Would fail some tests.

* Setting ongoing_broadcasts before unlocking

* Fix test active_transactions.bounded_active_elections

* Review

* Use an unordered_set of rep channels to prevent sending confirm_req repeatedly to the same channel
This commit is contained in:
Guilherme Lawless 2019-10-09 14:39:51 +01:00 committed by GitHub
commit 903391ef05
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 150 additions and 115 deletions

View file

@ -12,25 +12,25 @@ TEST (active_transactions, bounded_active_elections)
nano::node_config node_config (24000, system.logging); nano::node_config node_config (24000, system.logging);
node_config.enable_voting = false; node_config.enable_voting = false;
node_config.active_elections_size = 5; node_config.active_elections_size = 5;
auto & node1 = *system.add_node (node_config); auto & node = *system.add_node (node_config);
nano::genesis genesis; nano::genesis genesis;
size_t count (1); size_t count (0);
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - count * nano::xrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ()))); auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - count * nano::xrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
auto previous_size = node1.active.size ();
bool done (false); bool done (false);
system.deadline_set (5s); system.deadline_set (5s);
while (!done) while (!done)
{ {
node.process_active (send);
node.active.start (send);
ASSERT_FALSE (node.active.empty ());
ASSERT_LE (node.active.size (), node.config.active_elections_size);
done = count > node.active.size ();
count++; count++;
node1.process_active (send);
done = previous_size > node1.active.size ();
ASSERT_LT (node1.active.size (), node1.config.active_elections_size); //triggers after reverting #2116
ASSERT_NO_ERROR (system.poll ()); ASSERT_NO_ERROR (system.poll ());
auto previous_hash = send->hash (); auto previous_hash = send->hash ();
send = std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous_hash, nano::test_genesis_key.pub, nano::genesis_amount - count * nano::xrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (previous_hash)); send = std::make_shared<nano::state_block> (nano::test_genesis_key.pub, previous_hash, nano::test_genesis_key.pub, nano::genesis_amount - count * nano::xrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (previous_hash));
previous_size = node1.active.size ();
//sleep this thread for the max delay between request loop rounds possible for such a small active_elections_size //sleep this thread for the max delay between request loop rounds possible for such a small active_elections_size
std::this_thread::sleep_for (std::chrono::milliseconds (node1.network_params.network.request_interval_ms + (node_config.active_elections_size * 20))); std::this_thread::sleep_for (std::chrono::milliseconds (node.network_params.network.request_interval_ms + (node_config.active_elections_size * 20)));
} }
} }

View file

@ -150,41 +150,46 @@ void nano::active_transactions::post_confirmation_height_set (nano::transaction
void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> & lock_a) void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> & lock_a)
{ {
std::unordered_set<nano::qualified_root> inactive; std::unordered_set<nano::qualified_root> inactive_l;
auto transaction (node.store.tx_begin_read ()); auto transaction_l (node.store.tx_begin_read ());
unsigned unconfirmed_count (0); unsigned unconfirmed_count_l (0);
unsigned unconfirmed_request_count (0); unsigned unconfirmed_request_count_l (0);
unsigned could_fit_delay = node.network_params.network.is_test_network () ? high_confirmation_request_count - 1 : 1; unsigned could_fit_delay_l = node.network_params.network.is_test_network () ? high_confirmation_request_count - 1 : 1;
std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>> requests_bundle; std::deque<std::shared_ptr<nano::block>> blocks_bundle_l;
std::deque<std::shared_ptr<nano::block>> rebroadcast_bundle; std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>> batched_confirm_req_bundle_l;
std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>> confirm_req_bundle; std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>> single_confirm_req_bundle_l;
/* Confirm frontiers when there aren't many confirmations already pending and node finished initial bootstrap /* Confirm frontiers when there aren't many confirmations already pending and node finished initial bootstrap
In auto mode start confirm only if node contains almost principal representative (half of required for principal weight) */ In auto mode start confirm only if node contains almost principal representative (half of required for principal weight)
The confirmation height processor works asynchronously, compressing several roots into one frontier, so probably_unconfirmed_frontiers is not always correct*/
lock_a.unlock (); lock_a.unlock ();
if (node.config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled && node.ledger.block_count_cache > node.ledger.cemented_count + roots.size () && node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off && node.ledger.block_count_cache >= node.ledger.bootstrap_weight_max_blocks) auto pending_confirmation_height_size (node.pending_confirmation_height.size ());
bool probably_unconfirmed_frontiers (node.ledger.block_count_cache > node.ledger.cemented_count + roots.size () + pending_confirmation_height_size);
bool bootstrap_weight_reached (node.ledger.block_count_cache >= node.ledger.bootstrap_weight_max_blocks);
if (node.config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled && bootstrap_weight_reached && probably_unconfirmed_frontiers && pending_confirmation_height_size < confirmed_frontiers_max_pending_cut_off)
{ {
confirm_frontiers (transaction); confirm_frontiers (transaction_l);
} }
lock_a.lock (); lock_a.lock ();
auto roots_size (roots.size ()); auto representatives_l (node.rep_crawler.representatives (std::numeric_limits<size_t>::max ()));
auto roots_size_l (roots.size ());
for (auto i (roots.get<1> ().begin ()), n (roots.get<1> ().end ()); i != n; ++i) for (auto i (roots.get<1> ().begin ()), n (roots.get<1> ().end ()); i != n; ++i)
{ {
auto root (i->root); auto root_l (i->root);
auto election_l (i->election); auto election_l (i->election);
if ((election_l->confirmed || election_l->stopped) && election_l->confirmation_request_count >= minimum_confirmation_request_count - 1) if ((election_l->confirmed || election_l->stopped) && election_l->confirmation_request_count >= minimum_confirmation_request_count - 1)
{ {
if (election_l->stopped) if (election_l->stopped)
{ {
inactive.insert (root); inactive_l.insert (root_l);
} }
} }
else else
{ {
if (election_l->confirmation_request_count > high_confirmation_request_count) if (election_l->confirmation_request_count > high_confirmation_request_count)
{ {
++unconfirmed_count; ++unconfirmed_count_l;
unconfirmed_request_count += election_l->confirmation_request_count; unconfirmed_request_count_l += election_l->confirmation_request_count;
// Log votes for very long unconfirmed elections // Log votes for very long unconfirmed elections
if (election_l->confirmation_request_count % 50 == 1) if (election_l->confirmation_request_count % 50 == 1)
{ {
@ -194,49 +199,49 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
/* Escalation for long unconfirmed elections /* Escalation for long unconfirmed elections
Start new elections for previous block & source Start new elections for previous block & source
if there are less than 100 active elections */ if there are less than 100 active elections */
if (election_l->confirmation_request_count % high_confirmation_request_count == 1 && roots_size < 100 && !node.network_params.network.is_test_network ()) if (election_l->confirmation_request_count % high_confirmation_request_count == 1 && roots_size_l < 100 && !node.network_params.network.is_test_network ())
{ {
bool escalated (false); bool escalated_l (false);
std::shared_ptr<nano::block> previous; std::shared_ptr<nano::block> previous_l;
auto previous_hash (election_l->status.winner->previous ()); auto previous_hash_l (election_l->status.winner->previous ());
if (!previous_hash.is_zero ()) if (!previous_hash_l.is_zero ())
{ {
previous = node.store.block_get (transaction, previous_hash); previous_l = node.store.block_get (transaction_l, previous_hash_l);
if (previous != nullptr && blocks.find (previous_hash) == blocks.end () && !node.block_confirmed_or_being_confirmed (transaction, previous_hash)) if (previous_l != nullptr && blocks.find (previous_hash_l) == blocks.end () && !node.block_confirmed_or_being_confirmed (transaction_l, previous_hash_l))
{ {
add (std::move (previous)); add (std::move (previous_l));
escalated = true; escalated_l = true;
} }
} }
/* If previous block not existing/not commited yet, block_source can cause segfault for state blocks /* If previous block not existing/not commited yet, block_source can cause segfault for state blocks
So source check can be done only if previous != nullptr or previous is 0 (open account) */ So source check can be done only if previous != nullptr or previous is 0 (open account) */
if (previous_hash.is_zero () || previous != nullptr) if (previous_hash_l.is_zero () || previous_l != nullptr)
{ {
auto source_hash (node.ledger.block_source (transaction, *election_l->status.winner)); auto source_hash (node.ledger.block_source (transaction_l, *election_l->status.winner));
if (!source_hash.is_zero () && source_hash != previous_hash && blocks.find (source_hash) == blocks.end ()) if (!source_hash.is_zero () && source_hash != previous_hash_l && blocks.find (source_hash) == blocks.end ())
{ {
auto source (node.store.block_get (transaction, source_hash)); auto source (node.store.block_get (transaction_l, source_hash));
if (source != nullptr && !node.block_confirmed_or_being_confirmed (transaction, source_hash)) if (source != nullptr && !node.block_confirmed_or_being_confirmed (transaction_l, source_hash))
{ {
add (std::move (source)); add (std::move (source));
escalated = true; escalated_l = true;
} }
} }
} }
if (escalated) if (escalated_l)
{ {
election_l->update_dependent (); election_l->update_dependent ();
} }
} }
} }
if (election_l->confirmation_request_count < high_confirmation_request_count || election_l->confirmation_request_count % high_confirmation_request_count == could_fit_delay) if (election_l->confirmation_request_count < high_confirmation_request_count || election_l->confirmation_request_count % high_confirmation_request_count == could_fit_delay_l)
{ {
if (node.ledger.could_fit (transaction, *election_l->status.winner)) if (node.ledger.could_fit (transaction_l, *election_l->status.winner))
{ {
// Broadcast winner // Broadcast winner
if (rebroadcast_bundle.size () < max_broadcast_queue) if (blocks_bundle_l.size () < max_broadcast_queue)
{ {
rebroadcast_bundle.push_back (election_l->status.winner); blocks_bundle_l.push_back (election_l->status.winner);
} }
} }
else else
@ -244,21 +249,17 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
if (election_l->confirmation_request_count != 0) if (election_l->confirmation_request_count != 0)
{ {
election_l->stop (); election_l->stop ();
inactive.insert (root); inactive_l.insert (root_l);
} }
} }
} }
auto rep_channels (std::make_shared<std::vector<std::shared_ptr<nano::transport::channel>>> ()); std::unordered_set<std::shared_ptr<nano::transport::channel>> rep_channels_missing_vote_l;
auto reps (node.rep_crawler.representatives (std::numeric_limits<size_t>::max ())); // Add all rep endpoints that haven't already voted
for (auto & rep : representatives_l)
// Add all rep endpoints that haven't already voted. We use a set since multiple
// reps may exist on an endpoint.
std::unordered_set<std::shared_ptr<nano::transport::channel>> channels;
for (auto & rep : reps)
{ {
if (election_l->last_votes.find (rep.account) == election_l->last_votes.end ()) if (election_l->last_votes.find (rep.account) == election_l->last_votes.end ())
{ {
channels.insert (rep.channel); rep_channels_missing_vote_l.insert (rep.channel);
if (node.config.logging.vote_logging ()) if (node.config.logging.vote_logging ())
{ {
@ -266,11 +267,8 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
} }
} }
} }
bool low_reps_weight (rep_channels_missing_vote_l.empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ());
rep_channels->insert (rep_channels->end (), channels.begin (), channels.end ()); if (low_reps_weight && roots_size_l <= 5 && !node.network_params.network.is_test_network ())
bool low_reps_weight (rep_channels->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ());
if (low_reps_weight && roots_size <= 5 && !node.network_params.network.is_test_network ())
{ {
// Spam mode // Spam mode
auto deque_l (node.network.udp_channels.random_set (100)); auto deque_l (node.network.udp_channels.random_set (100));
@ -279,25 +277,25 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
{ {
vec->push_back (i); vec->push_back (i);
} }
confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, vec)); single_confirm_req_bundle_l.push_back (std::make_pair (election_l->status.winner, vec));
} }
else else
{ {
auto single_confirm_req_channels (std::make_shared<std::vector<std::shared_ptr<nano::transport::channel>>> ()); auto single_confirm_req_channels (std::make_shared<std::vector<std::shared_ptr<nano::transport::channel>>> ());
for (auto & rep : *rep_channels) for (auto & rep : rep_channels_missing_vote_l)
{ {
if (rep->get_network_version () >= node.network_params.protocol.tcp_realtime_protocol_version_min) if (rep->get_network_version () >= node.network_params.protocol.tcp_realtime_protocol_version_min)
{ {
// Send batch request to peers supporting confirm_req by hash + root // Send batch request to peers supporting confirm_req by hash + root
auto rep_request (requests_bundle.find (rep)); auto rep_request (batched_confirm_req_bundle_l.find (rep));
auto block (election_l->status.winner); auto block (election_l->status.winner);
auto root_hash (std::make_pair (block->hash (), block->root ())); auto root_hash (std::make_pair (block->hash (), block->root ()));
if (rep_request == requests_bundle.end ()) if (rep_request == batched_confirm_req_bundle_l.end ())
{ {
if (requests_bundle.size () < max_broadcast_queue) if (batched_confirm_req_bundle_l.size () < max_broadcast_queue)
{ {
std::deque<std::pair<nano::block_hash, nano::root>> insert_root_hash = { root_hash }; std::deque<std::pair<nano::block_hash, nano::root>> insert_root_hash = { root_hash };
requests_bundle.insert (std::make_pair (rep, insert_root_hash)); batched_confirm_req_bundle_l.insert (std::make_pair (rep, insert_root_hash));
} }
} }
else if (rep_request->second.size () < max_broadcast_queue * nano::network::confirm_req_hashes_max) else if (rep_request->second.size () < max_broadcast_queue * nano::network::confirm_req_hashes_max)
@ -311,33 +309,53 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
} }
} }
// broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing // broadcast_confirm_req_base modifies reps, so we clone it once to avoid aliasing
if (confirm_req_bundle.size () < max_broadcast_queue && !single_confirm_req_channels->empty ()) if (single_confirm_req_bundle_l.size () < max_broadcast_queue && !single_confirm_req_channels->empty ())
{ {
confirm_req_bundle.push_back (std::make_pair (election_l->status.winner, single_confirm_req_channels)); single_confirm_req_bundle_l.push_back (std::make_pair (election_l->status.winner, single_confirm_req_channels));
} }
} }
} }
++election_l->confirmation_request_count; ++election_l->confirmation_request_count;
} }
ongoing_broadcasts = !blocks_bundle_l.empty () + !batched_confirm_req_bundle_l.empty () + !single_confirm_req_bundle_l.empty ();
lock_a.unlock (); lock_a.unlock ();
// Rebroadcast unconfirmed blocks // Rebroadcast unconfirmed blocks
if (!rebroadcast_bundle.empty ()) if (!blocks_bundle_l.empty ())
{ {
node.network.flood_block_batch (std::move (rebroadcast_bundle)); node.network.flood_block_many (std::move (blocks_bundle_l), [this]() {
{
nano::lock_guard<std::mutex> guard_l (this->mutex);
--this->ongoing_broadcasts;
} }
// Batch confirmation request this->condition.notify_all ();
if (!requests_bundle.empty ()) });
{
node.network.broadcast_confirm_req_batch (requests_bundle, 50);
} }
//confirm_req broadcast // Batched confirmation requests
if (!confirm_req_bundle.empty ()) if (!batched_confirm_req_bundle_l.empty ())
{ {
node.network.broadcast_confirm_req_batch (confirm_req_bundle); node.network.broadcast_confirm_req_batched_many (batched_confirm_req_bundle_l, [this]() {
{
nano::lock_guard<std::mutex> guard_l (this->mutex);
--this->ongoing_broadcasts;
}
this->condition.notify_all ();
},
50);
}
// Single confirmation requests
if (!single_confirm_req_bundle_l.empty ())
{
node.network.broadcast_confirm_req_many (single_confirm_req_bundle_l, [this]() {
{
nano::lock_guard<std::mutex> guard_l (this->mutex);
--this->ongoing_broadcasts;
}
this->condition.notify_all ();
});
} }
lock_a.lock (); lock_a.lock ();
// Erase inactive elections // Erase inactive elections
for (auto i (inactive.begin ()), n (inactive.end ()); i != n; ++i) for (auto i (inactive_l.begin ()), n (inactive_l.end ()); i != n; ++i)
{ {
auto root_it (roots.find (*i)); auto root_it (roots.find (*i));
if (root_it != roots.end ()) if (root_it != roots.end ())
@ -347,10 +365,10 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
roots.erase (root_it); roots.erase (root_it);
} }
} }
long_unconfirmed_size = unconfirmed_count; long_unconfirmed_size = unconfirmed_count_l;
if (unconfirmed_count > 0) if (unconfirmed_count_l > 0)
{ {
node.logger.try_log (boost::str (boost::format ("%1% blocks have been unconfirmed averaging %2% confirmation requests") % unconfirmed_count % (unconfirmed_request_count / unconfirmed_count))); node.logger.try_log (boost::str (boost::format ("%1% blocks have been unconfirmed averaging %2% confirmation requests") % unconfirmed_count_l % (unconfirmed_request_count_l / unconfirmed_count_l)));
} }
} }
@ -372,17 +390,21 @@ void nano::active_transactions::request_loop ()
request_confirm (lock); request_confirm (lock);
update_active_difficulty (lock); update_active_difficulty (lock);
// This prevents unnecessary waiting if stopped is set in-between the above check and now const auto extra_delay_l (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2);
if (stopped) const auto wakeup_l (std::chrono::steady_clock::now () + std::chrono::milliseconds (node.network_params.network.request_interval_ms + extra_delay_l));
// Sleep until all broadcasts are done, plus the remaining loop time
while (!stopped && ongoing_broadcasts)
{ {
break; condition.wait (lock);
} }
const auto extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2); if (!stopped)
const auto wakeup (std::chrono::steady_clock::now () + std::chrono::milliseconds (node.network_params.network.request_interval_ms + extra_delay)); {
// clang-format off // clang-format off
condition.wait_until (lock, wakeup, [&wakeup, &stopped = stopped] { return stopped || std::chrono::steady_clock::now () >= wakeup; }); condition.wait_until (lock, wakeup_l, [&wakeup_l, &stopped = stopped] { return stopped || std::chrono::steady_clock::now () >= wakeup_l; });
// clang-format on // clang-format on
} }
}
} }
void nano::active_transactions::prioritize_account_for_confirmation (nano::active_transactions::prioritize_num_uncemented & cementable_frontiers_a, size_t & cementable_frontiers_size_a, nano::account const & account_a, nano::account_info const & info_a, uint64_t confirmation_height) void nano::active_transactions::prioritize_account_for_confirmation (nano::active_transactions::prioritize_num_uncemented & cementable_frontiers_a, size_t & cementable_frontiers_size_a, nano::account const & account_a, nano::account_info const & info_a, uint64_t confirmation_height)

View file

@ -154,6 +154,7 @@ private:
nano::condition_variable condition; nano::condition_variable condition;
bool started{ false }; bool started{ false };
std::atomic<bool> stopped{ false }; std::atomic<bool> stopped{ false };
unsigned ongoing_broadcasts{ 0 };
boost::multi_index_container< boost::multi_index_container<
nano::confirmed_set_info, nano::confirmed_set_info,
boost::multi_index::indexed_by< boost::multi_index::indexed_by<

View file

@ -3388,7 +3388,7 @@ void nano::json_handler::republish ()
} }
hash = node.store.block_successor (transaction, hash); hash = node.store.block_successor (transaction, hash);
} }
node.network.flood_block_batch (std::move (republish_bundle), 25); node.network.flood_block_many (std::move (republish_bundle), nullptr, 25);
response_l.put ("success", ""); // obsolete response_l.put ("success", ""); // obsolete
response_l.add_child ("blocks", blocks); response_l.add_child ("blocks", blocks);
} }
@ -4465,7 +4465,7 @@ void nano::json_handler::wallet_republish ()
blocks.push_back (std::make_pair ("", entry)); blocks.push_back (std::make_pair ("", entry));
} }
} }
node.network.flood_block_batch (std::move (republish_bundle), 25); node.network.flood_block_many (std::move (republish_bundle), nullptr, 25);
response_l.add_child ("blocks", blocks); response_l.add_child ("blocks", blocks);
} }
response_errors (); response_errors ();

View file

@ -224,23 +224,27 @@ void nano::network::flood_message (nano::message const & message_a, bool const i
} }
} }
void nano::network::flood_block_batch (std::deque<std::shared_ptr<nano::block>> blocks_a, unsigned delay_a) void nano::network::flood_block_many (std::deque<std::shared_ptr<nano::block>> blocks_a, std::function<void()> callback_a, unsigned delay_a)
{ {
auto block (blocks_a.front ()); auto block_l (blocks_a.front ());
blocks_a.pop_front (); blocks_a.pop_front ();
flood_block (block); flood_block (block_l);
if (!blocks_a.empty ()) if (!blocks_a.empty ())
{ {
std::weak_ptr<nano::node> node_w (node.shared ()); std::weak_ptr<nano::node> node_w (node.shared ());
// clang-format off // clang-format off
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), delay_a]() { node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a]() {
if (auto node_l = node_w.lock ()) if (auto node_l = node_w.lock ())
{ {
node_l->network.flood_block_batch (std::move (blocks), delay_a); node_l->network.flood_block_many (std::move (blocks), callback_a, delay_a);
} }
}); });
// clang-format on // clang-format on
} }
else if (callback_a)
{
callback_a ();
}
} }
void nano::network::send_confirm_req (std::shared_ptr<nano::transport::channel> channel_a, std::shared_ptr<nano::block> block_a) void nano::network::send_confirm_req (std::shared_ptr<nano::transport::channel> channel_a, std::shared_ptr<nano::block> block_a)
@ -319,29 +323,29 @@ void nano::network::broadcast_confirm_req_base (std::shared_ptr<nano::block> blo
} }
} }
void nano::network::broadcast_confirm_req_batch (std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>> request_bundle_a, unsigned delay_a, bool resumption) void nano::network::broadcast_confirm_req_batched_many (std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>> request_bundle_a, std::function<void()> callback_a, unsigned delay_a, bool resumption)
{ {
const size_t max_reps = 50; const size_t max_reps = 50;
if (!resumption && node.config.logging.network_logging ()) if (!resumption && node.config.logging.network_logging ())
{ {
node.logger.try_log (boost::str (boost::format ("Broadcasting batch confirm req to %1% representatives") % request_bundle_a.size ())); node.logger.try_log (boost::str (boost::format ("Broadcasting batch confirm req to %1% representatives") % request_bundle_a.size ()));
} }
auto count (0); auto count_l (0);
while (!request_bundle_a.empty () && count < max_reps) while (!request_bundle_a.empty () && count_l < max_reps)
{ {
auto j (request_bundle_a.begin ()); auto j (request_bundle_a.begin ());
while (j != request_bundle_a.end ()) while (j != request_bundle_a.end ())
{ {
count++; ++count_l;
std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes; std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes_l;
// Limit max request size hash + root to 7 pairs // Limit max request size hash + root to 7 pairs
while (roots_hashes.size () < confirm_req_hashes_max && !j->second.empty ()) while (roots_hashes_l.size () < confirm_req_hashes_max && !j->second.empty ())
{ {
// expects ordering by priority, descending // expects ordering by priority, descending
roots_hashes.push_back (j->second.front ()); roots_hashes_l.push_back (j->second.front ());
j->second.pop_front (); j->second.pop_front ();
} }
nano::confirm_req req (roots_hashes); nano::confirm_req req (roots_hashes_l);
j->first->send (req); j->first->send (req);
if (j->second.empty ()) if (j->second.empty ())
{ {
@ -356,38 +360,46 @@ void nano::network::broadcast_confirm_req_batch (std::unordered_map<std::shared_
if (!request_bundle_a.empty ()) if (!request_bundle_a.empty ())
{ {
std::weak_ptr<nano::node> node_w (node.shared ()); std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, request_bundle_a, delay_a]() { node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, request_bundle_a, callback_a, delay_a]() {
if (auto node_l = node_w.lock ()) if (auto node_l = node_w.lock ())
{ {
node_l->network.broadcast_confirm_req_batch (request_bundle_a, delay_a, true); node_l->network.broadcast_confirm_req_batched_many (request_bundle_a, callback_a, delay_a, true);
} }
}); });
} }
else if (callback_a)
{
callback_a ();
}
} }
void nano::network::broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>> deque_a, unsigned delay_a) void nano::network::broadcast_confirm_req_many (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>> requests_a, std::function<void()> callback_a, unsigned delay_a)
{ {
auto pair (deque_a.front ()); auto pair_l (requests_a.front ());
deque_a.pop_front (); requests_a.pop_front ();
auto block (pair.first); auto block_l (pair_l.first);
// confirm_req to representatives // confirm_req to representatives
auto endpoints (pair.second); auto endpoints (pair_l.second);
if (!endpoints->empty ()) if (!endpoints->empty ())
{ {
broadcast_confirm_req_base (block, endpoints, delay_a); broadcast_confirm_req_base (block_l, endpoints, delay_a);
} }
/* Continue while blocks remain /* Continue while blocks remain
Broadcast with random delay between delay_a & 2*delay_a */ Broadcast with random delay between delay_a & 2*delay_a */
if (!deque_a.empty ()) if (!requests_a.empty ())
{ {
std::weak_ptr<nano::node> node_w (node.shared ()); std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, deque_a, delay_a]() { node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, requests_a, callback_a, delay_a]() {
if (auto node_l = node_w.lock ()) if (auto node_l = node_w.lock ())
{ {
node_l->network.broadcast_confirm_req_batch (deque_a, delay_a); node_l->network.broadcast_confirm_req_many (requests_a, callback_a, delay_a);
} }
}); });
} }
else if (callback_a)
{
callback_a ();
}
} }
namespace namespace

View file

@ -116,7 +116,7 @@ public:
flood_message (publish, is_droppable_a); flood_message (publish, is_droppable_a);
} }
void flood_block_batch (std::deque<std::shared_ptr<nano::block>>, unsigned = broadcast_interval_ms); void flood_block_many (std::deque<std::shared_ptr<nano::block>>, std::function<void()> = nullptr, unsigned = broadcast_interval_ms);
void merge_peers (std::array<nano::endpoint, 8> const &); void merge_peers (std::array<nano::endpoint, 8> const &);
void merge_peer (nano::endpoint const &); void merge_peer (nano::endpoint const &);
void send_keepalive (std::shared_ptr<nano::transport::channel>); void send_keepalive (std::shared_ptr<nano::transport::channel>);
@ -125,8 +125,8 @@ public:
void send_confirm_req (std::shared_ptr<nano::transport::channel>, std::shared_ptr<nano::block>); void send_confirm_req (std::shared_ptr<nano::transport::channel>, std::shared_ptr<nano::block>);
void broadcast_confirm_req (std::shared_ptr<nano::block>); void broadcast_confirm_req (std::shared_ptr<nano::block>);
void broadcast_confirm_req_base (std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>, unsigned, bool = false); void broadcast_confirm_req_base (std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>, unsigned, bool = false);
void broadcast_confirm_req_batch (std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>>, unsigned = broadcast_interval_ms, bool = false); void broadcast_confirm_req_batched_many (std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>>, std::function<void()> = nullptr, unsigned = broadcast_interval_ms, bool = false);
void broadcast_confirm_req_batch (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>>, unsigned = broadcast_interval_ms); void broadcast_confirm_req_many (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>>, std::function<void()> = nullptr, unsigned = broadcast_interval_ms);
void confirm_hashes (nano::transaction const &, std::shared_ptr<nano::transport::channel>, std::vector<nano::block_hash>); void confirm_hashes (nano::transaction const &, std::shared_ptr<nano::transport::channel>, std::vector<nano::block_hash>);
bool send_votes_cache (std::shared_ptr<nano::transport::channel>, nano::block_hash const &); bool send_votes_cache (std::shared_ptr<nano::transport::channel>, nano::block_hash const &);
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &); std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);