Improve bootstrap frontiers confirmation (#2394)

* start new bootstrap attempt after failed confirmation
* prevent duplicates in frontiers vector
* increase chances of starting first bootstrap connection to requested peer
* disable rep crawler for specific tests (it can start elections)
* return confirmation status in confirm_frontiers () function
* add `force` option to `bootstrap_any` RPC
This commit is contained in:
Sergey Kroshnin 2019-11-08 01:22:28 +03:00 committed by GitHub
commit 5ce0831627
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 101 additions and 53 deletions

View file

@ -184,17 +184,26 @@ TEST (bootstrap_processor, DISABLED_process_none)
// Bootstrap can pull one basic block
TEST (bootstrap_processor, process_one)
{
nano::system system (24000, 1);
nano::system system;
nano::node_config node_config (24000, system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
node_config.enable_voting = false;
auto node0 = system.add_node (node_config);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
ASSERT_NE (nullptr, system.wallet (0)->send_action (nano::test_genesis_key.pub, nano::test_genesis_key.pub, 100));
auto node1 (std::make_shared<nano::node> (system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work));
nano::block_hash hash1 (system.nodes[0]->latest (nano::test_genesis_key.pub));
node_config.peering_port = 24001;
nano::node_flags node_flags;
node_flags.disable_bootstrap_bulk_push_client = true;
node_flags.disable_rep_crawler = true;
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, node_config, system.work, node_flags));
nano::block_hash hash1 (node0->latest (nano::test_genesis_key.pub));
nano::block_hash hash2 (node1->latest (nano::test_genesis_key.pub));
ASSERT_NE (hash1, hash2);
node1->bootstrap_initiator.bootstrap (system.nodes[0]->network.endpoint ());
ASSERT_NE (node1->latest (nano::test_genesis_key.pub), system.nodes[0]->latest (nano::test_genesis_key.pub));
node1->bootstrap_initiator.bootstrap (node0->network.endpoint ());
ASSERT_NE (node1->latest (nano::test_genesis_key.pub), node0->latest (nano::test_genesis_key.pub));
system.deadline_set (10s);
while (node1->latest (nano::test_genesis_key.pub) != system.nodes[0]->latest (nano::test_genesis_key.pub))
while (node1->latest (nano::test_genesis_key.pub) != node0->latest (nano::test_genesis_key.pub))
{
ASSERT_NO_ERROR (system.poll ());
}
@ -348,6 +357,7 @@ TEST (bootstrap_processor, frontiers_unconfirmed)
node_flags.disable_legacy_bootstrap = true;
node_flags.disable_lazy_bootstrap = true;
node_flags.disable_wallet_bootstrap = true;
node_flags.disable_rep_crawler = true;
auto node1 = system.add_node (node_config, node_flags);
nano::genesis genesis;
nano::keypair key1, key2;
@ -363,6 +373,7 @@ TEST (bootstrap_processor, frontiers_unconfirmed)
node_config.peering_port = 24001;
node_flags.disable_bootstrap_bulk_pull_server = false;
node_flags.disable_rep_crawler = false;
auto node2 = system.add_node (node_config, node_flags);
// Generating valid chain
auto send3 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::xrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
@ -407,6 +418,7 @@ TEST (bootstrap_processor, frontiers_confirmed)
node_flags.disable_legacy_bootstrap = true;
node_flags.disable_lazy_bootstrap = true;
node_flags.disable_wallet_bootstrap = true;
node_flags.disable_rep_crawler = true;
auto node1 = system.add_node (node_config, node_flags);
nano::genesis genesis;
nano::keypair key1, key2;
@ -424,6 +436,7 @@ TEST (bootstrap_processor, frontiers_confirmed)
// Test node to bootstrap
node_config.peering_port = 24001;
node_flags.disable_legacy_bootstrap = false;
node_flags.disable_rep_crawler = false;
auto node2 = system.add_node (node_config, node_flags);
system.deadline_set (5s);
while (node2->rep_crawler.representative_count () == 0)

View file

@ -233,7 +233,8 @@ TEST (node, auto_bootstrap)
nano::keypair key2;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
system.wallet (0)->insert_adhoc (key2.prv);
ASSERT_NE (nullptr, system.wallet (0)->send_action (nano::test_genesis_key.pub, key2.pub, system.nodes[0]->config.receive_minimum.number ()));
auto send1 (system.wallet (0)->send_action (nano::test_genesis_key.pub, key2.pub, system.nodes[0]->config.receive_minimum.number ()));
ASSERT_NE (nullptr, send1);
system.deadline_set (10s);
while (system.nodes[0]->balance (key2.pub) != system.nodes[0]->config.receive_minimum.number ())
{
@ -260,8 +261,16 @@ TEST (node, auto_bootstrap)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node1->ledger.block_exists (send1->hash ()));
// Wait block receive
system.deadline_set (5s);
while (node1->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out) + node1->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out) < 2)
while (node1->ledger.block_count_cache < 3)
{
ASSERT_NO_ERROR (system.poll ());
}
// Confirmation for all blocks
system.deadline_set (5s);
while (node1->ledger.cemented_count < 3)
{
ASSERT_NO_ERROR (system.poll ());
}

View file

@ -102,10 +102,10 @@ bool nano::bootstrap_attempt::should_log ()
return result;
}
bool nano::bootstrap_attempt::request_frontier (nano::unique_lock<std::mutex> & lock_a)
bool nano::bootstrap_attempt::request_frontier (nano::unique_lock<std::mutex> & lock_a, bool first_attempt)
{
auto result (true);
auto connection_l (connection (lock_a));
auto connection_l (connection (lock_a, first_attempt));
connection_frontier_request = connection_l;
if (connection_l)
{
@ -212,10 +212,13 @@ void nano::bootstrap_attempt::run_start (nano::unique_lock<std::mutex> & lock_a)
total_blocks = 0;
requeued_pulls = 0;
pulls.clear ();
recent_pulls_head.clear ();
auto frontier_failure (true);
uint64_t frontier_attempts (0);
while (!stopped && frontier_failure)
{
frontier_failure = request_frontier (lock_a);
++frontier_attempts;
frontier_failure = request_frontier (lock_a, frontier_attempts == 1);
}
frontiers_received = true;
// Shuffle pulls.
@ -293,7 +296,7 @@ void nano::bootstrap_attempt::run ()
idle.clear ();
}
std::shared_ptr<nano::bootstrap_client> nano::bootstrap_attempt::connection (nano::unique_lock<std::mutex> & lock_a)
std::shared_ptr<nano::bootstrap_client> nano::bootstrap_attempt::connection (nano::unique_lock<std::mutex> & lock_a, bool use_front_connection)
{
// clang-format off
condition.wait (lock_a, [& stopped = stopped, &idle = idle] { return stopped || !idle.empty (); });
@ -301,8 +304,16 @@ std::shared_ptr<nano::bootstrap_client> nano::bootstrap_attempt::connection (nan
std::shared_ptr<nano::bootstrap_client> result;
if (!idle.empty ())
{
result = idle.back ();
idle.pop_back ();
if (!use_front_connection)
{
result = idle.back ();
idle.pop_back ();
}
else
{
result = idle.front ();
idle.pop_front ();
}
}
return result;
}
@ -625,9 +636,9 @@ void nano::bootstrap_attempt::attempt_restart_check (nano::unique_lock<std::mute
- or 128k processed blocks indicating large bootstrap */
if (!frontiers_confirmed && (requeued_pulls > (!node->network_params.network.is_test_network () ? nano::bootstrap_limits::requeued_pulls_limit : nano::bootstrap_limits::requeued_pulls_limit_test) || total_blocks > nano::bootstrap_limits::frontier_confirmation_blocks_limit))
{
confirm_frontiers (lock_a);
auto confirmed (confirm_frontiers (lock_a));
assert (lock_a.owns_lock ());
if (!frontiers_confirmed)
if (!confirmed)
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in);
auto score (node->bootstrap_initiator.excluded_peers.add (endpoint_frontier_request, node->network.size ()));
@ -635,42 +646,41 @@ void nano::bootstrap_attempt::attempt_restart_check (nano::unique_lock<std::mute
{
node->logger.always_log (boost::str (boost::format ("Adding peer %1% to excluded peers list with score %2% after %3% seconds bootstrap attempt") % endpoint_frontier_request % score % std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - attempt_start).count ()));
}
for (auto i : clients)
{
if (auto client = i.lock ())
{
if (auto socket_l = client->channel->socket.lock ())
{
socket_l->close ();
}
}
}
idle.clear ();
run_start (lock_a);
lock_a.unlock ();
stop ();
lock_a.lock ();
// Start new bootstrap connection
auto node_l (node->shared ());
node->background ([node_l]() {
node_l->bootstrap_initiator.bootstrap (true);
});
}
else
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_successful, nano::stat::dir::in);
}
frontiers_confirmed = confirmed;
}
}
void nano::bootstrap_attempt::confirm_frontiers (nano::unique_lock<std::mutex> & lock_a)
bool nano::bootstrap_attempt::confirm_frontiers (nano::unique_lock<std::mutex> & lock_a)
{
bool confirmed (false);
assert (!frontiers_confirmed);
// clang-format off
condition.wait (lock_a, [& stopped = stopped] { return !stopped; });
// clang-format on
std::vector<nano::block_hash> frontiers;
for (auto i (pulls.begin ()), end (pulls.end ()); i != end && frontiers.size () != nano::bootstrap_limits::bootstrap_max_confirm_frontiers; ++i)
{
if (!i->head.is_zero ())
if (!i->head.is_zero () && std::find (frontiers.begin (), frontiers.end (), i->head) == frontiers.end ())
{
frontiers.push_back (i->head);
}
}
for (auto i (recent_pulls_head.begin ()), end (recent_pulls_head.end ()); i != end && frontiers.size () != nano::bootstrap_limits::bootstrap_max_confirm_frontiers; ++i)
{
if (!i->is_zero ())
if (!i->is_zero () && std::find (frontiers.begin (), frontiers.end (), *i) == frontiers.end ())
{
frontiers.push_back (*i);
}
@ -716,7 +726,7 @@ void nano::bootstrap_attempt::confirm_frontiers (nano::unique_lock<std::mutex> &
}
}
// Start requests
for (auto i (0), max_requests (20); i <= max_requests && !frontiers_confirmed && !stopped; ++i)
for (auto i (0), max_requests (20); i <= max_requests && !confirmed && !stopped; ++i)
{
std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>> batched_confirm_req_bundle;
std::deque<std::pair<nano::block_hash, nano::root>> request;
@ -766,7 +776,7 @@ void nano::bootstrap_attempt::confirm_frontiers (nano::unique_lock<std::mutex> &
auto confirmed_count (frontiers_count - frontiers.size ());
if (confirmed_count >= frontiers_count * nano::bootstrap_limits::required_frontier_confirmation_ratio) // 80% of frontiers confirmed
{
frontiers_confirmed = true;
confirmed = true;
}
else if (i < max_requests)
{
@ -774,12 +784,13 @@ void nano::bootstrap_attempt::confirm_frontiers (nano::unique_lock<std::mutex> &
std::this_thread::sleep_for (std::chrono::milliseconds (!node->network_params.network.is_test_network () ? 500 : 5));
}
}
if (!frontiers_confirmed)
if (!confirmed)
{
node->logger.always_log (boost::str (boost::format ("Failed to confirm frontiers for bootstrap attempt. %1% of %2% frontiers were not confirmed") % frontiers.size () % frontiers_count));
}
}
lock_a.lock ();
return confirmed;
}
void nano::bootstrap_attempt::lazy_start (nano::hash_or_account const & hash_or_account_a, bool confirmed)
@ -1327,9 +1338,16 @@ nano::bootstrap_initiator::~bootstrap_initiator ()
stop ();
}
void nano::bootstrap_initiator::bootstrap ()
void nano::bootstrap_initiator::bootstrap (bool force)
{
nano::unique_lock<std::mutex> lock (mutex);
if (force && attempt != nullptr)
{
attempt->stop ();
// clang-format off
condition.wait (lock, [&attempt = attempt, &stopped = stopped] { return stopped || attempt == nullptr; });
// clang-format on
}
if (!stopped && attempt == nullptr)
{
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate, nano::stat::dir::out);
@ -1373,15 +1391,12 @@ void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & ha
{
{
nano::unique_lock<std::mutex> lock (mutex);
if (force)
if (force && attempt != nullptr)
{
if (attempt != nullptr)
{
attempt->stop ();
// clang-format off
condition.wait (lock, [&attempt = attempt, &stopped = stopped] { return stopped || attempt == nullptr; });
// clang-format on
}
attempt->stop ();
// clang-format off
condition.wait (lock, [&attempt = attempt, &stopped = stopped] { return stopped || attempt == nullptr; });
// clang-format on
}
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate_lazy, nano::stat::dir::out);
if (attempt == nullptr)

View file

@ -62,11 +62,11 @@ public:
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy);
~bootstrap_attempt ();
void run ();
std::shared_ptr<nano::bootstrap_client> connection (nano::unique_lock<std::mutex> &);
std::shared_ptr<nano::bootstrap_client> connection (nano::unique_lock<std::mutex> &, bool = false);
bool consume_future (std::future<bool> &);
void populate_connections ();
void start_populate_connections ();
bool request_frontier (nano::unique_lock<std::mutex> &);
bool request_frontier (nano::unique_lock<std::mutex> &, bool = false);
void request_pull (nano::unique_lock<std::mutex> &);
void request_push (nano::unique_lock<std::mutex> &);
void add_connection (nano::endpoint const &);
@ -81,7 +81,7 @@ public:
bool should_log ();
void add_bulk_push_target (nano::block_hash const &, nano::block_hash const &);
void attempt_restart_check (nano::unique_lock<std::mutex> &);
void confirm_frontiers (nano::unique_lock<std::mutex> &);
bool confirm_frontiers (nano::unique_lock<std::mutex> &);
bool process_block (std::shared_ptr<nano::block>, nano::account const &, uint64_t, nano::bulk_pull::count_t, bool, unsigned);
/** Lazy bootstrap */
void lazy_run ();
@ -240,7 +240,7 @@ public:
explicit bootstrap_initiator (nano::node &);
~bootstrap_initiator ();
void bootstrap (nano::endpoint const &, bool add_to_peers = true, bool frontiers_confirmed = false);
void bootstrap ();
void bootstrap (bool force = false);
void bootstrap_lazy (nano::hash_or_account const &, bool force = false, bool confirmed = true);
void bootstrap_wallet (std::deque<nano::account> &);
void run_bootstrap ();

View file

@ -1630,9 +1630,10 @@ void nano::json_handler::bootstrap ()
void nano::json_handler::bootstrap_any ()
{
const bool force = request.get<bool> ("force", false);
if (!node.flags.disable_legacy_bootstrap)
{
node.bootstrap_initiator.bootstrap ();
node.bootstrap_initiator.bootstrap (force);
response_l.put ("success", "");
}
else

View file

@ -659,7 +659,10 @@ void nano::node::start ()
});
}
ongoing_store_flush ();
rep_crawler.start ();
if (!flags.disable_rep_crawler)
{
rep_crawler.start ();
}
ongoing_rep_calculation ();
ongoing_peer_store ();
ongoing_online_weight_calculation_queue ();
@ -1041,7 +1044,10 @@ void nano::node::add_initial_peers ()
if (auto node_l = node_w.lock ())
{
node_l->network.send_keepalive (channel_a);
node_l->rep_crawler.query (channel_a);
if (!node_l->flags.disable_rep_crawler)
{
node_l->rep_crawler.query (channel_a);
}
}
});
}

View file

@ -117,6 +117,7 @@ public:
bool disable_bootstrap_listener{ false };
bool disable_bootstrap_bulk_pull_server{ false };
bool disable_bootstrap_bulk_push_client{ false };
bool disable_rep_crawler{ false };
bool disable_tcp_realtime{ false };
bool disable_udp{ false };
bool disable_unchecked_cleanup{ false };

View file

@ -4,9 +4,12 @@
nano::rep_crawler::rep_crawler (nano::node & node_a) :
node (node_a)
{
node.observers.endpoint.add ([this](std::shared_ptr<nano::transport::channel> channel_a) {
this->query (channel_a);
});
if (!node.flags.disable_rep_crawler)
{
node.observers.endpoint.add ([this](std::shared_ptr<nano::transport::channel> channel_a) {
this->query (channel_a);
});
}
}
void nano::rep_crawler::add (nano::block_hash const & hash_a)