Bootstrap attempts and connections/pulls separation (#2499)
- New class `nano::bootstrap_connections` to manage client connections & bulk pulls - Separate source file for connections - Restored bootstrap clients list (to close connections with `stop` command) - Parent class `nano::bootstrap_attempt` & child classes for legacy, lazy & wallet bootstraps - Separate source files for bootstrap attempts & lazy attempts - Allowing several concurrent bootstrap attempts (currently for different bootstrap modes, in the future can be easily modified to allow same concurrent modes) - Separate `nano::bootstrap_attempts` class for fast attempt search with incremental ID - Bulk pull info is modified to include bootstrap attempt incremental ID - `force` option in bootstrap RPCs & RPC "bootstrap" are currenlty designed to close all concurrent attempts - Config field `bootstrap_initiator_threads` to manage bootstrap concurrency. Default 2 for multithreaded systems, 1 for singlethreaded & test network - RPC "bootstrap_status" is modified to show all bootstrap attempts & new connections class - Reduced lazy bootstrap attempts memory consumption: processed blocks unordered map is modified to store only 64 bit hash of block instead of full 256 bit union - Fixed `websocket.bootstrap_exited` test TSAN warnings
This commit is contained in:
parent
d3ed8e1125
commit
e10701220f
27 changed files with 2395 additions and 1723 deletions
|
@ -1,5 +1,6 @@
|
|||
#include <nano/core_test/testutil.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_frontier.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
|
||||
#include <nano/node/testing.hpp>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
@ -337,6 +338,7 @@ TEST (bootstrap_processor, pull_diamond)
|
|||
|
||||
TEST (bootstrap_processor, DISABLED_pull_requeue_network_error)
|
||||
{
|
||||
// Bootstrap attempt stopped before requeue & then cannot be found in attempts list
|
||||
nano::system system;
|
||||
nano::node_config config (nano::get_available_port (), system.logging);
|
||||
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
|
||||
|
@ -359,10 +361,11 @@ TEST (bootstrap_processor, DISABLED_pull_requeue_network_error)
|
|||
}
|
||||
// Add non-existing pull & stop remote peer
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (attempt->mutex);
|
||||
nano::unique_lock<std::mutex> lock (node1->bootstrap_initiator.connections->mutex);
|
||||
ASSERT_FALSE (attempt->stopped);
|
||||
attempt->pulls.push_back (nano::pull_info (nano::test_genesis_key.pub, send1->hash (), genesis.hash ()));
|
||||
attempt->request_pull (lock);
|
||||
++attempt->pulling;
|
||||
node1->bootstrap_initiator.connections->pulls.push_back (nano::pull_info (nano::test_genesis_key.pub, send1->hash (), genesis.hash (), attempt->incremental_id));
|
||||
node1->bootstrap_initiator.connections->request_pull (lock);
|
||||
node2->stop ();
|
||||
}
|
||||
system.deadline_set (5s);
|
||||
|
@ -620,9 +623,9 @@ TEST (bootstrap_processor, lazy_hash)
|
|||
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
|
||||
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true);
|
||||
{
|
||||
auto attempt (node1->bootstrap_initiator.current_attempt ());
|
||||
ASSERT_NE (nullptr, attempt);
|
||||
ASSERT_EQ (receive2->hash ().to_string (), attempt->id);
|
||||
auto lazy_attempt (node1->bootstrap_initiator.current_lazy_attempt ());
|
||||
ASSERT_NE (nullptr, lazy_attempt);
|
||||
ASSERT_EQ (receive2->hash ().to_string (), lazy_attempt->id);
|
||||
}
|
||||
// Check processed blocks
|
||||
system.deadline_set (10s);
|
||||
|
@ -660,9 +663,9 @@ TEST (bootstrap_processor, lazy_hash_bootstrap_id)
|
|||
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
|
||||
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true, true, "123456");
|
||||
{
|
||||
auto attempt (node1->bootstrap_initiator.current_attempt ());
|
||||
ASSERT_NE (nullptr, attempt);
|
||||
ASSERT_EQ ("123456", attempt->id);
|
||||
auto lazy_attempt (node1->bootstrap_initiator.current_lazy_attempt ());
|
||||
ASSERT_NE (nullptr, lazy_attempt);
|
||||
ASSERT_EQ ("123456", lazy_attempt->id);
|
||||
}
|
||||
// Check processed blocks
|
||||
system.deadline_set (10s);
|
||||
|
@ -858,9 +861,9 @@ TEST (bootstrap_processor, wallet_lazy_frontier)
|
|||
wallet->insert_adhoc (key2.prv);
|
||||
node1->bootstrap_wallet ();
|
||||
{
|
||||
auto attempt (node1->bootstrap_initiator.current_attempt ());
|
||||
ASSERT_NE (nullptr, attempt);
|
||||
ASSERT_EQ (key2.pub.to_account (), attempt->id);
|
||||
auto wallet_attempt (node1->bootstrap_initiator.current_wallet_attempt ());
|
||||
ASSERT_NE (nullptr, wallet_attempt);
|
||||
ASSERT_EQ (key2.pub.to_account (), wallet_attempt->id);
|
||||
}
|
||||
// Check processed blocks
|
||||
system.deadline_set (10s);
|
||||
|
@ -908,6 +911,61 @@ TEST (bootstrap_processor, wallet_lazy_pending)
|
|||
node1->stop ();
|
||||
}
|
||||
|
||||
TEST (bootstrap_processor, multiple_attempts)
|
||||
{
|
||||
nano::system system;
|
||||
nano::node_config config (nano::get_available_port (), system.logging);
|
||||
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_bootstrap_bulk_push_client = true;
|
||||
auto node1 = system.add_node (config, node_flags);
|
||||
nano::genesis genesis;
|
||||
nano::keypair key1;
|
||||
nano::keypair key2;
|
||||
// Generating test chain
|
||||
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1->work_generate_blocking (genesis.hash ())));
|
||||
auto receive1 (std::make_shared<nano::state_block> (key1.pub, 0, key1.pub, nano::Gxrb_ratio, send1->hash (), key1.prv, key1.pub, *node1->work_generate_blocking (key1.pub)));
|
||||
auto send2 (std::make_shared<nano::state_block> (key1.pub, receive1->hash (), key1.pub, 0, key2.pub, key1.prv, key1.pub, *node1->work_generate_blocking (receive1->hash ())));
|
||||
auto receive2 (std::make_shared<nano::state_block> (key2.pub, 0, key2.pub, nano::Gxrb_ratio, send2->hash (), key2.prv, key2.pub, *node1->work_generate_blocking (key2.pub)));
|
||||
// Processing test chain
|
||||
node1->block_processor.add (send1);
|
||||
node1->block_processor.add (receive1);
|
||||
node1->block_processor.add (send2);
|
||||
node1->block_processor.add (receive2);
|
||||
node1->block_processor.flush ();
|
||||
// Start 2 concurrent bootstrap attempts
|
||||
nano::node_config node_config (nano::get_available_port (), system.logging);
|
||||
node_config.bootstrap_initiator_threads = 3;
|
||||
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, node_config, system.work));
|
||||
node2->network.udp_channels.insert (node1->network.endpoint (), node2->network_params.protocol.protocol_version);
|
||||
node2->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true);
|
||||
node2->bootstrap_initiator.bootstrap ();
|
||||
auto lazy_attempt (node2->bootstrap_initiator.current_lazy_attempt ());
|
||||
auto legacy_attempt (node2->bootstrap_initiator.current_attempt ());
|
||||
system.deadline_set (5s);
|
||||
while (!lazy_attempt->started || !legacy_attempt->started)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
// Check that both bootstrap attempts are running & not finished
|
||||
ASSERT_FALSE (lazy_attempt->stopped);
|
||||
ASSERT_FALSE (legacy_attempt->stopped);
|
||||
ASSERT_GE (node2->bootstrap_initiator.attempts.size (), 2);
|
||||
// Check processed blocks
|
||||
system.deadline_set (10s);
|
||||
while (node2->balance (key2.pub) == 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
// Check attempts finish
|
||||
system.deadline_set (5s);
|
||||
while (node2->bootstrap_initiator.attempts.size () != 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
node2->stop ();
|
||||
}
|
||||
|
||||
TEST (frontier_req_response, DISABLED_destruction)
|
||||
{
|
||||
{
|
||||
|
|
|
@ -2406,27 +2406,35 @@ TEST (node, balance_observer)
|
|||
}
|
||||
}
|
||||
|
||||
// ASSERT_NE (nullptr, attempt) sometimes fails
|
||||
TEST (node, DISABLED_bootstrap_connection_scaling)
|
||||
TEST (node, bootstrap_connection_scaling)
|
||||
{
|
||||
nano::system system (1);
|
||||
auto & node1 (*system.nodes[0]);
|
||||
node1.bootstrap_initiator.bootstrap ();
|
||||
auto attempt (node1.bootstrap_initiator.current_attempt ());
|
||||
ASSERT_NE (nullptr, attempt);
|
||||
ASSERT_EQ (34, attempt->target_connections (25000));
|
||||
ASSERT_EQ (4, attempt->target_connections (0));
|
||||
ASSERT_EQ (64, attempt->target_connections (50000));
|
||||
ASSERT_EQ (64, attempt->target_connections (10000000000));
|
||||
ASSERT_EQ (34, node1.bootstrap_initiator.connections->target_connections (5000, 1));
|
||||
ASSERT_EQ (4, node1.bootstrap_initiator.connections->target_connections (0, 1));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (50000, 1));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (10000000000, 1));
|
||||
ASSERT_EQ (32, node1.bootstrap_initiator.connections->target_connections (5000, 0));
|
||||
ASSERT_EQ (1, node1.bootstrap_initiator.connections->target_connections (0, 0));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (50000, 0));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (10000000000, 0));
|
||||
ASSERT_EQ (36, node1.bootstrap_initiator.connections->target_connections (5000, 2));
|
||||
ASSERT_EQ (8, node1.bootstrap_initiator.connections->target_connections (0, 2));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (50000, 2));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (10000000000, 2));
|
||||
node1.config.bootstrap_connections = 128;
|
||||
ASSERT_EQ (64, attempt->target_connections (0));
|
||||
ASSERT_EQ (64, attempt->target_connections (50000));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (0, 1));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (50000, 1));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (0, 2));
|
||||
ASSERT_EQ (64, node1.bootstrap_initiator.connections->target_connections (50000, 2));
|
||||
node1.config.bootstrap_connections_max = 256;
|
||||
ASSERT_EQ (128, attempt->target_connections (0));
|
||||
ASSERT_EQ (256, attempt->target_connections (50000));
|
||||
ASSERT_EQ (128, node1.bootstrap_initiator.connections->target_connections (0, 1));
|
||||
ASSERT_EQ (256, node1.bootstrap_initiator.connections->target_connections (50000, 1));
|
||||
ASSERT_EQ (256, node1.bootstrap_initiator.connections->target_connections (0, 2));
|
||||
ASSERT_EQ (256, node1.bootstrap_initiator.connections->target_connections (50000, 2));
|
||||
node1.config.bootstrap_connections_max = 0;
|
||||
ASSERT_EQ (1, attempt->target_connections (0));
|
||||
ASSERT_EQ (1, attempt->target_connections (50000));
|
||||
ASSERT_EQ (1, node1.bootstrap_initiator.connections->target_connections (0, 1));
|
||||
ASSERT_EQ (1, node1.bootstrap_initiator.connections->target_connections (50000, 1));
|
||||
}
|
||||
|
||||
// Test stat counting at both type and detail levels
|
||||
|
|
|
@ -150,6 +150,7 @@ TEST (toml, daemon_config_deserialize_defaults)
|
|||
ASSERT_EQ (conf.node.block_processor_batch_max_time, defaults.node.block_processor_batch_max_time);
|
||||
ASSERT_EQ (conf.node.bootstrap_connections, defaults.node.bootstrap_connections);
|
||||
ASSERT_EQ (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max);
|
||||
ASSERT_EQ (conf.node.bootstrap_initiator_threads, defaults.node.bootstrap_initiator_threads);
|
||||
ASSERT_EQ (conf.node.bootstrap_fraction_numerator, defaults.node.bootstrap_fraction_numerator);
|
||||
ASSERT_EQ (conf.node.conf_height_processor_batch_min_time, defaults.node.conf_height_processor_batch_min_time);
|
||||
ASSERT_EQ (conf.node.confirmation_history_size, defaults.node.confirmation_history_size);
|
||||
|
@ -388,6 +389,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
|
|||
block_processor_batch_max_time = 999
|
||||
bootstrap_connections = 999
|
||||
bootstrap_connections_max = 999
|
||||
bootstrap_initiator_threads = 999
|
||||
bootstrap_fraction_numerator = 999
|
||||
conf_height_processor_batch_min_time = 999
|
||||
confirmation_history_size = 999
|
||||
|
@ -547,6 +549,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
|
|||
ASSERT_NE (conf.node.block_processor_batch_max_time, defaults.node.block_processor_batch_max_time);
|
||||
ASSERT_NE (conf.node.bootstrap_connections, defaults.node.bootstrap_connections);
|
||||
ASSERT_NE (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max);
|
||||
ASSERT_NE (conf.node.bootstrap_initiator_threads, defaults.node.bootstrap_initiator_threads);
|
||||
ASSERT_NE (conf.node.bootstrap_fraction_numerator, defaults.node.bootstrap_fraction_numerator);
|
||||
ASSERT_NE (conf.node.conf_height_processor_batch_min_time, defaults.node.conf_height_processor_batch_min_time);
|
||||
ASSERT_NE (conf.node.confirmation_history_size, defaults.node.confirmation_history_size);
|
||||
|
|
|
@ -51,6 +51,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
|
|||
case nano::thread_role::name::bootstrap_initiator:
|
||||
thread_role_name_string = "Bootstrap init";
|
||||
break;
|
||||
case nano::thread_role::name::bootstrap_connections:
|
||||
thread_role_name_string = "Bootstrap conn";
|
||||
break;
|
||||
case nano::thread_role::name::voting:
|
||||
thread_role_name_string = "Voting";
|
||||
break;
|
||||
|
|
|
@ -25,6 +25,7 @@ namespace thread_role
|
|||
request_loop,
|
||||
wallet_actions,
|
||||
bootstrap_initiator,
|
||||
bootstrap_connections,
|
||||
voting,
|
||||
signature_checking,
|
||||
rpc_request_processor,
|
||||
|
|
|
@ -26,12 +26,18 @@ add_library (node
|
|||
active_transactions.cpp
|
||||
blockprocessor.hpp
|
||||
blockprocessor.cpp
|
||||
bootstrap/bootstrap_attempt.hpp
|
||||
bootstrap/bootstrap_attempt.cpp
|
||||
bootstrap/bootstrap_bulk_pull.hpp
|
||||
bootstrap/bootstrap_bulk_pull.cpp
|
||||
bootstrap/bootstrap_bulk_push.hpp
|
||||
bootstrap/bootstrap_bulk_push.cpp
|
||||
bootstrap/bootstrap_connections.hpp
|
||||
bootstrap/bootstrap_connections.cpp
|
||||
bootstrap/bootstrap_frontier.hpp
|
||||
bootstrap/bootstrap_frontier.cpp
|
||||
bootstrap/bootstrap_lazy.hpp
|
||||
bootstrap/bootstrap_lazy.cpp
|
||||
bootstrap/bootstrap_server.hpp
|
||||
bootstrap/bootstrap_server.cpp
|
||||
bootstrap/bootstrap.hpp
|
||||
|
|
|
@ -35,12 +35,14 @@ void nano::block_processor::stop ()
|
|||
void nano::block_processor::flush ()
|
||||
{
|
||||
node.checker.flush ();
|
||||
flushing = true;
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
while (!stopped && (have_blocks () || active))
|
||||
{
|
||||
condition.wait (lock);
|
||||
}
|
||||
blocks_filter.clear ();
|
||||
flushing = false;
|
||||
}
|
||||
|
||||
size_t nano::block_processor::size ()
|
||||
|
@ -579,9 +581,5 @@ nano::block_hash nano::block_processor::filter_item (nano::block_hash const & ha
|
|||
void nano::block_processor::requeue_invalid (nano::block_hash const & hash_a, nano::unchecked_info const & info_a)
|
||||
{
|
||||
debug_assert (hash_a == info_a.block->hash ());
|
||||
auto attempt (node.bootstrap_initiator.current_attempt ());
|
||||
if (attempt != nullptr && attempt->mode == nano::bootstrap_mode::lazy)
|
||||
{
|
||||
attempt->lazy_requeue (hash_a, info_a.block->previous (), info_a.confirmed);
|
||||
}
|
||||
node.bootstrap_initiator.lazy_requeue (hash_a, info_a.block->previous (), info_a.confirmed);
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ public:
|
|||
nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false, const bool = false);
|
||||
nano::process_return process_one (nano::write_transaction const &, std::shared_ptr<nano::block>, const bool = false);
|
||||
nano::vote_generator generator;
|
||||
std::atomic<bool> flushing{ false };
|
||||
// Delay required for average network propagartion before requesting confirmation
|
||||
static std::chrono::milliseconds constexpr confirmation_request_delay{ 1500 };
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -1,10 +1,8 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/node/bootstrap/bootstrap_bulk_pull.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_connections.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/socket.hpp>
|
||||
#include <nano/secure/blockstore.hpp>
|
||||
#include <nano/secure/ledger.hpp>
|
||||
|
||||
#include <boost/multi_index/hashed_index.hpp>
|
||||
#include <boost/multi_index/member.hpp>
|
||||
|
@ -13,178 +11,30 @@
|
|||
#include <boost/thread/thread.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <future>
|
||||
#include <queue>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace mi = boost::multi_index;
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class bootstrap_attempt;
|
||||
class bootstrap_client;
|
||||
class node;
|
||||
|
||||
class bootstrap_connections;
|
||||
namespace transport
|
||||
{
|
||||
class channel_tcp;
|
||||
}
|
||||
enum class sync_result
|
||||
{
|
||||
success,
|
||||
error,
|
||||
fork
|
||||
};
|
||||
enum class bootstrap_mode
|
||||
{
|
||||
legacy,
|
||||
lazy,
|
||||
wallet_lazy
|
||||
};
|
||||
class lazy_state_backlog_item final
|
||||
enum class sync_result
|
||||
{
|
||||
public:
|
||||
nano::link link{ 0 };
|
||||
nano::uint128_t balance{ 0 };
|
||||
unsigned retry_limit{ 0 };
|
||||
};
|
||||
class lazy_destinations_item final
|
||||
{
|
||||
public:
|
||||
nano::account account{ 0 };
|
||||
uint64_t count{ 0 };
|
||||
};
|
||||
class frontier_req_client;
|
||||
class bulk_push_client;
|
||||
class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_attempt>
|
||||
{
|
||||
public:
|
||||
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy, std::string id_a = "");
|
||||
~bootstrap_attempt ();
|
||||
void run ();
|
||||
std::shared_ptr<nano::bootstrap_client> connection (nano::unique_lock<std::mutex> &, bool = false);
|
||||
bool consume_future (std::future<bool> &);
|
||||
void populate_connections ();
|
||||
void start_populate_connections ();
|
||||
bool request_frontier (nano::unique_lock<std::mutex> &, bool = false);
|
||||
void request_pull (nano::unique_lock<std::mutex> &);
|
||||
void request_push (nano::unique_lock<std::mutex> &);
|
||||
void add_connection (nano::endpoint const &);
|
||||
void connect_client (nano::tcp_endpoint const &);
|
||||
void pool_connection (std::shared_ptr<nano::bootstrap_client>);
|
||||
void stop ();
|
||||
void requeue_pull (nano::pull_info const &, bool = false);
|
||||
void add_pull (nano::pull_info const &);
|
||||
bool still_pulling ();
|
||||
void run_start (nano::unique_lock<std::mutex> &);
|
||||
unsigned target_connections (size_t pulls_remaining);
|
||||
bool should_log ();
|
||||
void add_bulk_push_target (nano::block_hash const &, nano::block_hash const &);
|
||||
void attempt_restart_check (nano::unique_lock<std::mutex> &);
|
||||
bool confirm_frontiers (nano::unique_lock<std::mutex> &);
|
||||
bool process_block (std::shared_ptr<nano::block>, nano::account const &, uint64_t, nano::bulk_pull::count_t, bool, unsigned);
|
||||
std::string mode_text ();
|
||||
/** Lazy bootstrap */
|
||||
void lazy_run ();
|
||||
void lazy_start (nano::hash_or_account const &, bool confirmed = true);
|
||||
void lazy_add (nano::hash_or_account const &, unsigned = std::numeric_limits<unsigned>::max ());
|
||||
void lazy_requeue (nano::block_hash const &, nano::block_hash const &, bool);
|
||||
bool lazy_finished ();
|
||||
bool lazy_has_expired () const;
|
||||
void lazy_pull_flush ();
|
||||
void lazy_clear ();
|
||||
bool process_block_lazy (std::shared_ptr<nano::block>, nano::account const &, uint64_t, nano::bulk_pull::count_t, unsigned);
|
||||
void lazy_block_state (std::shared_ptr<nano::block>, unsigned);
|
||||
void lazy_block_state_backlog_check (std::shared_ptr<nano::block>, nano::block_hash const &);
|
||||
void lazy_backlog_cleanup ();
|
||||
void lazy_destinations_increment (nano::account const &);
|
||||
void lazy_destinations_flush ();
|
||||
bool lazy_processed_or_exists (nano::block_hash const &);
|
||||
/** Lazy bootstrap */
|
||||
/** Wallet bootstrap */
|
||||
void request_pending (nano::unique_lock<std::mutex> &);
|
||||
void requeue_pending (nano::account const &);
|
||||
void wallet_run ();
|
||||
void wallet_start (std::deque<nano::account> &);
|
||||
bool wallet_finished ();
|
||||
/** Wallet bootstrap */
|
||||
std::mutex next_log_mutex;
|
||||
std::chrono::steady_clock::time_point next_log;
|
||||
std::deque<std::weak_ptr<nano::bootstrap_client>> clients;
|
||||
std::weak_ptr<nano::bootstrap_client> connection_frontier_request;
|
||||
nano::tcp_endpoint endpoint_frontier_request;
|
||||
std::weak_ptr<nano::frontier_req_client> frontiers;
|
||||
std::weak_ptr<nano::bulk_push_client> push;
|
||||
std::deque<nano::pull_info> pulls;
|
||||
std::deque<nano::block_hash> recent_pulls_head;
|
||||
std::deque<std::shared_ptr<nano::bootstrap_client>> idle;
|
||||
std::atomic<unsigned> connections{ 0 };
|
||||
std::atomic<unsigned> pulling{ 0 };
|
||||
std::shared_ptr<nano::node> node;
|
||||
std::atomic<unsigned> account_count{ 0 };
|
||||
std::atomic<uint64_t> total_blocks{ 0 };
|
||||
std::atomic<unsigned> runs_count{ 0 };
|
||||
std::atomic<unsigned> requeued_pulls{ 0 };
|
||||
std::vector<std::pair<nano::block_hash, nano::block_hash>> bulk_push_targets;
|
||||
std::atomic<bool> frontiers_received{ false };
|
||||
std::atomic<bool> frontiers_confirmed{ false };
|
||||
std::atomic<bool> populate_connections_started{ false };
|
||||
std::atomic<bool> stopped{ false };
|
||||
std::chrono::steady_clock::time_point attempt_start{ std::chrono::steady_clock::now () };
|
||||
nano::bootstrap_mode mode;
|
||||
std::string id;
|
||||
std::mutex mutex;
|
||||
nano::condition_variable condition;
|
||||
// Lazy bootstrap
|
||||
std::unordered_set<nano::block_hash> lazy_blocks;
|
||||
std::unordered_map<nano::block_hash, nano::lazy_state_backlog_item> lazy_state_backlog;
|
||||
std::unordered_set<nano::block_hash> lazy_undefined_links;
|
||||
std::unordered_map<nano::block_hash, nano::uint128_t> lazy_balances;
|
||||
std::unordered_set<nano::block_hash> lazy_keys;
|
||||
std::deque<std::pair<nano::hash_or_account, unsigned>> lazy_pulls;
|
||||
std::chrono::steady_clock::time_point lazy_start_time;
|
||||
std::chrono::steady_clock::time_point last_lazy_flush{ std::chrono::steady_clock::now () };
|
||||
class account_tag
|
||||
{
|
||||
};
|
||||
class count_tag
|
||||
{
|
||||
};
|
||||
// clang-format off
|
||||
boost::multi_index_container<lazy_destinations_item,
|
||||
mi::indexed_by<
|
||||
mi::ordered_non_unique<mi::tag<count_tag>,
|
||||
mi::member<lazy_destinations_item, uint64_t, &lazy_destinations_item::count>,
|
||||
std::greater<uint64_t>>,
|
||||
mi::hashed_unique<mi::tag<account_tag>,
|
||||
mi::member<lazy_destinations_item, nano::account, &lazy_destinations_item::account>>>>
|
||||
lazy_destinations;
|
||||
// clang-format on
|
||||
std::atomic<size_t> lazy_blocks_count{ 0 };
|
||||
std::atomic<bool> lazy_destinations_flushed{ false };
|
||||
std::mutex lazy_mutex;
|
||||
// Wallet lazy bootstrap
|
||||
std::deque<nano::account> wallet_accounts;
|
||||
/** The maximum number of records to be read in while iterating over long lazy containers */
|
||||
static uint64_t constexpr batch_read_size = 256;
|
||||
};
|
||||
class bootstrap_client final : public std::enable_shared_from_this<bootstrap_client>
|
||||
{
|
||||
public:
|
||||
bootstrap_client (std::shared_ptr<nano::node>, std::shared_ptr<nano::bootstrap_attempt>, std::shared_ptr<nano::transport::channel_tcp>, std::shared_ptr<nano::socket>);
|
||||
~bootstrap_client ();
|
||||
std::shared_ptr<nano::bootstrap_client> shared ();
|
||||
void stop (bool force);
|
||||
double block_rate () const;
|
||||
double elapsed_seconds () const;
|
||||
std::shared_ptr<nano::node> node;
|
||||
std::shared_ptr<nano::bootstrap_attempt> attempt;
|
||||
std::shared_ptr<nano::transport::channel_tcp> channel;
|
||||
std::shared_ptr<nano::socket> socket;
|
||||
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
|
||||
std::chrono::steady_clock::time_point start_time;
|
||||
std::atomic<uint64_t> block_count;
|
||||
std::atomic<bool> pending_stop;
|
||||
std::atomic<bool> hard_stop;
|
||||
success,
|
||||
error,
|
||||
fork
|
||||
};
|
||||
class cached_pulls final
|
||||
{
|
||||
|
@ -246,6 +96,18 @@ public:
|
|||
constexpr static std::chrono::hours exclude_time_hours = std::chrono::hours (1);
|
||||
constexpr static std::chrono::hours exclude_remove_hours = std::chrono::hours (24);
|
||||
};
|
||||
class bootstrap_attempts final
|
||||
{
|
||||
public:
|
||||
void add (std::shared_ptr<nano::bootstrap_attempt>);
|
||||
void remove (uint64_t);
|
||||
void clear ();
|
||||
std::shared_ptr<nano::bootstrap_attempt> find (uint64_t);
|
||||
size_t size ();
|
||||
std::atomic<uint64_t> incremental{ 0 };
|
||||
std::mutex bootstrap_attempts_mutex;
|
||||
std::map<uint64_t, std::shared_ptr<nano::bootstrap_attempt>> attempts;
|
||||
};
|
||||
|
||||
class bootstrap_initiator final
|
||||
{
|
||||
|
@ -257,23 +119,33 @@ public:
|
|||
void bootstrap_lazy (nano::hash_or_account const &, bool force = false, bool confirmed = true, std::string id_a = "");
|
||||
void bootstrap_wallet (std::deque<nano::account> &);
|
||||
void run_bootstrap ();
|
||||
void lazy_requeue (nano::block_hash const &, nano::block_hash const &, bool);
|
||||
void notify_listeners (bool);
|
||||
void add_observer (std::function<void(bool)> const &);
|
||||
bool in_progress ();
|
||||
std::shared_ptr<nano::bootstrap_connections> connections;
|
||||
std::shared_ptr<nano::bootstrap_attempt> new_attempt ();
|
||||
bool has_new_attempts ();
|
||||
std::shared_ptr<nano::bootstrap_attempt> current_attempt ();
|
||||
std::shared_ptr<nano::bootstrap_attempt> current_lazy_attempt ();
|
||||
std::shared_ptr<nano::bootstrap_attempt> current_wallet_attempt ();
|
||||
nano::pulls_cache cache;
|
||||
nano::bootstrap_excluded_peers excluded_peers;
|
||||
nano::bootstrap_attempts attempts;
|
||||
void stop ();
|
||||
|
||||
private:
|
||||
nano::node & node;
|
||||
std::shared_ptr<nano::bootstrap_attempt> attempt;
|
||||
std::atomic<bool> stopped;
|
||||
std::shared_ptr<nano::bootstrap_attempt> find_attempt (nano::bootstrap_mode);
|
||||
void remove_attempt (std::shared_ptr<nano::bootstrap_attempt>);
|
||||
void stop_attempts ();
|
||||
std::vector<std::shared_ptr<nano::bootstrap_attempt>> attempts_list;
|
||||
std::atomic<bool> stopped{ false };
|
||||
std::mutex mutex;
|
||||
nano::condition_variable condition;
|
||||
std::mutex observers_mutex;
|
||||
std::vector<std::function<void(bool)>> observers;
|
||||
boost::thread thread;
|
||||
std::vector<boost::thread> bootstrap_initiator_threads;
|
||||
|
||||
friend std::unique_ptr<container_info_component> collect_container_info (bootstrap_initiator & bootstrap_initiator, const std::string & name);
|
||||
};
|
||||
|
@ -282,8 +154,7 @@ std::unique_ptr<container_info_component> collect_container_info (bootstrap_init
|
|||
class bootstrap_limits final
|
||||
{
|
||||
public:
|
||||
static constexpr double bootstrap_connection_scale_target_blocks = 50000.0;
|
||||
static constexpr double bootstrap_connection_scale_target_blocks_lazy = bootstrap_connection_scale_target_blocks / 5;
|
||||
static constexpr double bootstrap_connection_scale_target_blocks = 10000.0;
|
||||
static constexpr double bootstrap_connection_warmup_time_sec = 5.0;
|
||||
static constexpr double bootstrap_minimum_blocks_per_sec = 10.0;
|
||||
static constexpr double bootstrap_minimum_elapsed_seconds_blockrate = 0.02;
|
||||
|
|
602
nano/node/bootstrap/bootstrap_attempt.cpp
Normal file
602
nano/node/bootstrap/bootstrap_attempt.cpp
Normal file
|
@ -0,0 +1,602 @@
|
|||
#include <nano/crypto_lib/random_pool.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_bulk_push.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_frontier.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
#include <nano/node/websocket.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
constexpr size_t nano::bootstrap_limits::bootstrap_max_confirm_frontiers;
|
||||
constexpr double nano::bootstrap_limits::required_frontier_confirmation_ratio;
|
||||
constexpr unsigned nano::bootstrap_limits::frontier_confirmation_blocks_limit;
|
||||
constexpr unsigned nano::bootstrap_limits::requeued_pulls_limit;
|
||||
constexpr unsigned nano::bootstrap_limits::requeued_pulls_limit_test;
|
||||
|
||||
nano::bootstrap_attempt::bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a, uint64_t incremental_id_a, std::string id_a) :
|
||||
node (node_a),
|
||||
mode (mode_a),
|
||||
incremental_id (incremental_id_a),
|
||||
id (id_a)
|
||||
{
|
||||
if (id.empty ())
|
||||
{
|
||||
nano::random_constants constants;
|
||||
id = constants.random_128.to_string ();
|
||||
}
|
||||
node->logger.always_log (boost::str (boost::format ("Starting %1% bootstrap attempt with ID %2%") % mode_text () % id));
|
||||
node->bootstrap_initiator.notify_listeners (true);
|
||||
if (node->websocket_server)
|
||||
{
|
||||
nano::websocket::message_builder builder;
|
||||
node->websocket_server->broadcast (builder.bootstrap_started (id, mode_text ()));
|
||||
}
|
||||
}
|
||||
|
||||
nano::bootstrap_attempt::~bootstrap_attempt ()
|
||||
{
|
||||
node->logger.always_log (boost::str (boost::format ("Exiting %1% bootstrap attempt with ID %2%") % mode_text () % id));
|
||||
node->bootstrap_initiator.notify_listeners (false);
|
||||
if (node->websocket_server)
|
||||
{
|
||||
nano::websocket::message_builder builder;
|
||||
node->websocket_server->broadcast (builder.bootstrap_exited (id, mode_text (), attempt_start, total_blocks));
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt::should_log ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (next_log_mutex);
|
||||
auto result (false);
|
||||
auto now (std::chrono::steady_clock::now ());
|
||||
if (next_log < now)
|
||||
{
|
||||
result = true;
|
||||
next_log = now + std::chrono::seconds (15);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt::still_pulling ()
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
auto running (!stopped);
|
||||
auto still_pulling (pulling > 0);
|
||||
return running && still_pulling;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::stop ()
|
||||
{
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
stopped = true;
|
||||
}
|
||||
condition.notify_all ();
|
||||
node->bootstrap_initiator.connections->clear_pulls (incremental_id);
|
||||
}
|
||||
|
||||
std::string nano::bootstrap_attempt::mode_text ()
|
||||
{
|
||||
std::string mode_text;
|
||||
if (mode == nano::bootstrap_mode::legacy)
|
||||
{
|
||||
mode_text = "legacy";
|
||||
}
|
||||
else if (mode == nano::bootstrap_mode::lazy)
|
||||
{
|
||||
mode_text = "lazy";
|
||||
}
|
||||
else if (mode == nano::bootstrap_mode::wallet_lazy)
|
||||
{
|
||||
mode_text = "wallet_lazy";
|
||||
}
|
||||
return mode_text;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::restart_condition ()
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::legacy);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::add_frontier (nano::pull_info const &)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::legacy);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::add_bulk_push_target (nano::block_hash const &, nano::block_hash const &)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::legacy);
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt::request_bulk_push_target (std::pair<nano::block_hash, nano::block_hash> &)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::legacy);
|
||||
return true;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::add_recent_pull (nano::block_hash const &)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::legacy);
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt::process_block (std::shared_ptr<nano::block> block_a, nano::account const & known_account_a, uint64_t pull_blocks, nano::bulk_pull::count_t max_blocks, bool block_expected, unsigned retry_limit)
|
||||
{
|
||||
nano::unchecked_info info (block_a, known_account_a, 0, nano::signature_verification::unknown);
|
||||
node->block_processor.add (info);
|
||||
return false;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::lazy_start (nano::hash_or_account const &, bool)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::lazy);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::lazy_add (nano::pull_info const &)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::lazy);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::lazy_requeue (nano::block_hash const &, nano::block_hash const &, bool)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::lazy);
|
||||
}
|
||||
|
||||
uint32_t nano::bootstrap_attempt::lazy_batch_size ()
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::lazy);
|
||||
return node->network_params.bootstrap.lazy_min_pull_blocks;
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt::lazy_processed_or_exists (nano::block_hash const &)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::lazy);
|
||||
return false;
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt::lazy_has_expired () const
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::lazy);
|
||||
return true;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::requeue_pending (nano::account const &)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::wallet_lazy);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt::wallet_start (std::deque<nano::account> &)
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::wallet_lazy);
|
||||
}
|
||||
|
||||
size_t nano::bootstrap_attempt::wallet_size ()
|
||||
{
|
||||
debug_assert (mode == nano::bootstrap_mode::wallet_lazy);
|
||||
return 0;
|
||||
}
|
||||
|
||||
nano::bootstrap_attempt_legacy::bootstrap_attempt_legacy (std::shared_ptr<nano::node> node_a, uint64_t incremental_id_a, std::string id_a) :
|
||||
nano::bootstrap_attempt (node_a, nano::bootstrap_mode::legacy, incremental_id_a, id_a)
|
||||
{
|
||||
node->bootstrap_initiator.notify_listeners (true);
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_legacy::consume_future (std::future<bool> & future_a)
|
||||
{
|
||||
bool result;
|
||||
try
|
||||
{
|
||||
result = future_a.get ();
|
||||
}
|
||||
catch (std::future_error &)
|
||||
{
|
||||
result = true;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::stop ()
|
||||
{
|
||||
stopped = true;
|
||||
condition.notify_all ();
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
if (auto i = frontiers.lock ())
|
||||
{
|
||||
try
|
||||
{
|
||||
i->promise.set_value (true);
|
||||
}
|
||||
catch (std::future_error &)
|
||||
{
|
||||
}
|
||||
}
|
||||
if (auto i = push.lock ())
|
||||
{
|
||||
try
|
||||
{
|
||||
i->promise.set_value (true);
|
||||
}
|
||||
catch (std::future_error &)
|
||||
{
|
||||
}
|
||||
}
|
||||
lock.unlock ();
|
||||
node->bootstrap_initiator.connections->clear_pulls (incremental_id);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::request_push (nano::unique_lock<std::mutex> & lock_a)
|
||||
{
|
||||
bool error (false);
|
||||
lock_a.unlock ();
|
||||
auto connection_l (node->bootstrap_initiator.connections->find_connection (endpoint_frontier_request));
|
||||
lock_a.lock ();
|
||||
if (connection_l)
|
||||
{
|
||||
std::future<bool> future;
|
||||
{
|
||||
auto this_l (shared_from_this ());
|
||||
auto client (std::make_shared<nano::bulk_push_client> (connection_l, this_l));
|
||||
client->start ();
|
||||
push = client;
|
||||
future = client->promise.get_future ();
|
||||
}
|
||||
lock_a.unlock ();
|
||||
error = consume_future (future); // This is out of scope of `client' so when the last reference via boost::asio::io_context is lost and the client is destroyed, the future throws an exception.
|
||||
lock_a.lock ();
|
||||
}
|
||||
if (node->config.logging.network_logging ())
|
||||
{
|
||||
node->logger.try_log ("Exiting bulk push client");
|
||||
if (error)
|
||||
{
|
||||
node->logger.try_log ("Bulk push client failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::add_frontier (nano::pull_info const & pull_a)
|
||||
{
|
||||
nano::pull_info pull (pull_a);
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
frontier_pulls.push_back (pull);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::add_bulk_push_target (nano::block_hash const & head, nano::block_hash const & end)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
bulk_push_targets.emplace_back (head, end);
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_legacy::request_bulk_push_target (std::pair<nano::block_hash, nano::block_hash> & current_target_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
auto empty (bulk_push_targets.empty ());
|
||||
if (!empty)
|
||||
{
|
||||
current_target_a = bulk_push_targets.back ();
|
||||
bulk_push_targets.pop_back ();
|
||||
}
|
||||
return empty;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::add_recent_pull (nano::block_hash const & head_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
recent_pulls_head.push_back (head_a);
|
||||
if (recent_pulls_head.size () > nano::bootstrap_limits::bootstrap_max_confirm_frontiers)
|
||||
{
|
||||
recent_pulls_head.pop_front ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::restart_condition ()
|
||||
{
|
||||
/* Conditions to start frontiers confirmation:
|
||||
- not completed frontiers confirmation
|
||||
- more than 256 pull retries usually indicating issues with requested pulls
|
||||
- or 128k processed blocks indicating large bootstrap */
|
||||
if (!frontiers_confirmation_pending && !frontiers_confirmed && (requeued_pulls > (!node->network_params.network.is_test_network () ? nano::bootstrap_limits::requeued_pulls_limit : nano::bootstrap_limits::requeued_pulls_limit_test) || total_blocks > nano::bootstrap_limits::frontier_confirmation_blocks_limit))
|
||||
{
|
||||
frontiers_confirmation_pending = true;
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::attempt_restart_check (nano::unique_lock<std::mutex> & lock_a)
|
||||
{
|
||||
if (frontiers_confirmation_pending)
|
||||
{
|
||||
auto confirmed (confirm_frontiers (lock_a));
|
||||
debug_assert (lock_a.owns_lock ());
|
||||
if (!confirmed)
|
||||
{
|
||||
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in);
|
||||
auto score (node->bootstrap_initiator.excluded_peers.add (endpoint_frontier_request, node->network.size ()));
|
||||
if (score >= nano::bootstrap_excluded_peers::score_limit)
|
||||
{
|
||||
node->logger.always_log (boost::str (boost::format ("Adding peer %1% to excluded peers list with score %2% after %3% seconds bootstrap attempt") % endpoint_frontier_request % score % std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - attempt_start).count ()));
|
||||
}
|
||||
lock_a.unlock ();
|
||||
stop ();
|
||||
lock_a.lock ();
|
||||
// Start new bootstrap connection
|
||||
auto node_l (node->shared ());
|
||||
node->background ([node_l]() {
|
||||
node_l->bootstrap_initiator.bootstrap (true);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_successful, nano::stat::dir::in);
|
||||
}
|
||||
frontiers_confirmed = confirmed;
|
||||
frontiers_confirmation_pending = false;
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_legacy::confirm_frontiers (nano::unique_lock<std::mutex> & lock_a)
|
||||
{
|
||||
bool confirmed (false);
|
||||
debug_assert (!frontiers_confirmed);
|
||||
condition.wait (lock_a, [& stopped = stopped] { return !stopped; });
|
||||
auto this_l (shared_from_this ());
|
||||
std::vector<nano::block_hash> frontiers;
|
||||
lock_a.unlock ();
|
||||
nano::unique_lock<std::mutex> pulls_lock (node->bootstrap_initiator.connections->mutex);
|
||||
for (auto i (node->bootstrap_initiator.connections->pulls.begin ()), end (node->bootstrap_initiator.connections->pulls.end ()); i != end && frontiers.size () != nano::bootstrap_limits::bootstrap_max_confirm_frontiers; ++i)
|
||||
{
|
||||
if (!i->head.is_zero () && i->bootstrap_id == incremental_id && std::find (frontiers.begin (), frontiers.end (), i->head) == frontiers.end ())
|
||||
{
|
||||
frontiers.push_back (i->head);
|
||||
}
|
||||
}
|
||||
pulls_lock.unlock ();
|
||||
lock_a.lock ();
|
||||
for (auto i (recent_pulls_head.begin ()), end (recent_pulls_head.end ()); i != end && frontiers.size () != nano::bootstrap_limits::bootstrap_max_confirm_frontiers; ++i)
|
||||
{
|
||||
if (!i->is_zero () && std::find (frontiers.begin (), frontiers.end (), *i) == frontiers.end ())
|
||||
{
|
||||
frontiers.push_back (*i);
|
||||
}
|
||||
}
|
||||
lock_a.unlock ();
|
||||
auto frontiers_count (frontiers.size ());
|
||||
if (frontiers_count > 0)
|
||||
{
|
||||
const size_t reps_limit = 20;
|
||||
auto representatives (node->rep_crawler.representatives ());
|
||||
auto reps_weight (node->rep_crawler.total_weight ());
|
||||
auto representatives_copy (representatives);
|
||||
nano::uint128_t total_weight (0);
|
||||
// Select random peers from bottom 50% of principal representatives
|
||||
if (representatives.size () > 1)
|
||||
{
|
||||
std::reverse (representatives.begin (), representatives.end ());
|
||||
representatives.resize (representatives.size () / 2);
|
||||
for (auto i = static_cast<CryptoPP::word32> (representatives.size () - 1); i > 0; --i)
|
||||
{
|
||||
auto k = nano::random_pool::generate_word32 (0, i);
|
||||
std::swap (representatives[i], representatives[k]);
|
||||
}
|
||||
if (representatives.size () > reps_limit)
|
||||
{
|
||||
representatives.resize (reps_limit);
|
||||
}
|
||||
}
|
||||
for (auto const & rep : representatives)
|
||||
{
|
||||
total_weight += rep.weight.number ();
|
||||
}
|
||||
// Select peers with total 25% of reps stake from top 50% of principal representatives
|
||||
representatives_copy.resize (representatives_copy.size () / 2);
|
||||
while (total_weight < reps_weight / 4) // 25%
|
||||
{
|
||||
auto k = nano::random_pool::generate_word32 (0, static_cast<CryptoPP::word32> (representatives_copy.size () - 1));
|
||||
auto rep (representatives_copy[k]);
|
||||
if (std::find (representatives.begin (), representatives.end (), rep) == representatives.end ())
|
||||
{
|
||||
representatives.push_back (rep);
|
||||
total_weight += rep.weight.number ();
|
||||
}
|
||||
}
|
||||
// Start requests
|
||||
for (auto i (0), max_requests (20); i <= max_requests && !confirmed && !stopped; ++i)
|
||||
{
|
||||
std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>> batched_confirm_req_bundle;
|
||||
std::deque<std::pair<nano::block_hash, nano::root>> request;
|
||||
// Find confirmed frontiers (tally > 12.5% of reps stake, 60% of requestsed reps responded
|
||||
for (auto ii (frontiers.begin ()); ii != frontiers.end ();)
|
||||
{
|
||||
if (node->ledger.block_exists (*ii))
|
||||
{
|
||||
ii = frontiers.erase (ii);
|
||||
}
|
||||
else
|
||||
{
|
||||
nano::unique_lock<std::mutex> active_lock (node->active.mutex);
|
||||
auto existing (node->active.find_inactive_votes_cache (*ii));
|
||||
active_lock.unlock ();
|
||||
nano::uint128_t tally;
|
||||
for (auto & voter : existing.voters)
|
||||
{
|
||||
tally += node->ledger.weight (voter);
|
||||
}
|
||||
if (existing.confirmed || (tally > reps_weight / 8 && existing.voters.size () >= representatives.size () * 0.6)) // 12.5% of weight, 60% of reps
|
||||
{
|
||||
ii = frontiers.erase (ii);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto const & rep : representatives)
|
||||
{
|
||||
if (std::find (existing.voters.begin (), existing.voters.end (), rep.account) == existing.voters.end ())
|
||||
{
|
||||
release_assert (!ii->is_zero ());
|
||||
auto rep_request (batched_confirm_req_bundle.find (rep.channel));
|
||||
if (rep_request == batched_confirm_req_bundle.end ())
|
||||
{
|
||||
std::deque<std::pair<nano::block_hash, nano::root>> insert_root_hash = { std::make_pair (*ii, *ii) };
|
||||
batched_confirm_req_bundle.emplace (rep.channel, insert_root_hash);
|
||||
}
|
||||
else
|
||||
{
|
||||
rep_request->second.emplace_back (*ii, *ii);
|
||||
}
|
||||
}
|
||||
}
|
||||
++ii;
|
||||
}
|
||||
}
|
||||
}
|
||||
auto confirmed_count (frontiers_count - frontiers.size ());
|
||||
if (confirmed_count >= frontiers_count * nano::bootstrap_limits::required_frontier_confirmation_ratio) // 80% of frontiers confirmed
|
||||
{
|
||||
confirmed = true;
|
||||
}
|
||||
else if (i < max_requests)
|
||||
{
|
||||
node->network.broadcast_confirm_req_batched_many (batched_confirm_req_bundle);
|
||||
std::this_thread::sleep_for (std::chrono::milliseconds (!node->network_params.network.is_test_network () ? 500 : 5));
|
||||
}
|
||||
}
|
||||
if (!confirmed)
|
||||
{
|
||||
node->logger.always_log (boost::str (boost::format ("Failed to confirm frontiers for bootstrap attempt. %1% of %2% frontiers were not confirmed") % frontiers.size () % frontiers_count));
|
||||
}
|
||||
}
|
||||
lock_a.lock ();
|
||||
return confirmed;
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_legacy::request_frontier (nano::unique_lock<std::mutex> & lock_a, bool first_attempt)
|
||||
{
|
||||
auto result (true);
|
||||
lock_a.unlock ();
|
||||
auto connection_l (node->bootstrap_initiator.connections->connection (shared_from_this (), first_attempt));
|
||||
lock_a.lock ();
|
||||
if (connection_l)
|
||||
{
|
||||
endpoint_frontier_request = connection_l->channel->get_tcp_endpoint ();
|
||||
std::future<bool> future;
|
||||
{
|
||||
auto this_l (shared_from_this ());
|
||||
auto client (std::make_shared<nano::frontier_req_client> (connection_l, this_l));
|
||||
client->run ();
|
||||
frontiers = client;
|
||||
future = client->promise.get_future ();
|
||||
}
|
||||
lock_a.unlock ();
|
||||
result = consume_future (future); // This is out of scope of `client' so when the last reference via boost::asio::io_context is lost and the client is destroyed, the future throws an exception.
|
||||
lock_a.lock ();
|
||||
if (result)
|
||||
{
|
||||
frontier_pulls.clear ();
|
||||
}
|
||||
else
|
||||
{
|
||||
account_count = frontier_pulls.size ();
|
||||
// Shuffle pulls
|
||||
release_assert (std::numeric_limits<CryptoPP::word32>::max () > frontier_pulls.size ());
|
||||
if (!frontier_pulls.empty ())
|
||||
{
|
||||
for (auto i = static_cast<CryptoPP::word32> (frontier_pulls.size () - 1); i > 0; --i)
|
||||
{
|
||||
auto k = nano::random_pool::generate_word32 (0, i);
|
||||
std::swap (frontier_pulls[i], frontier_pulls[k]);
|
||||
}
|
||||
}
|
||||
// Add to regular pulls
|
||||
while (!frontier_pulls.empty ())
|
||||
{
|
||||
auto pull (frontier_pulls.front ());
|
||||
lock_a.unlock ();
|
||||
node->bootstrap_initiator.connections->add_pull (pull);
|
||||
++pulling;
|
||||
lock_a.lock ();
|
||||
frontier_pulls.pop_front ();
|
||||
}
|
||||
}
|
||||
if (node->config.logging.network_logging ())
|
||||
{
|
||||
if (!result)
|
||||
{
|
||||
node->logger.try_log (boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % account_count % connection_l->channel->to_string ()));
|
||||
}
|
||||
else
|
||||
{
|
||||
node->stats.inc (nano::stat::type::error, nano::stat::detail::frontier_req, nano::stat::dir::out);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::run_start (nano::unique_lock<std::mutex> & lock_a)
|
||||
{
|
||||
frontiers_received = false;
|
||||
frontiers_confirmed = false;
|
||||
total_blocks = 0;
|
||||
requeued_pulls = 0;
|
||||
recent_pulls_head.clear ();
|
||||
auto frontier_failure (true);
|
||||
uint64_t frontier_attempts (0);
|
||||
while (!stopped && frontier_failure)
|
||||
{
|
||||
++frontier_attempts;
|
||||
frontier_failure = request_frontier (lock_a, frontier_attempts == 1);
|
||||
}
|
||||
frontiers_received = true;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::run ()
|
||||
{
|
||||
debug_assert (started);
|
||||
debug_assert (!node->flags.disable_legacy_bootstrap);
|
||||
node->bootstrap_initiator.connections->populate_connections (false);
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
run_start (lock);
|
||||
while (still_pulling ())
|
||||
{
|
||||
while (still_pulling ())
|
||||
{
|
||||
// clang-format off
|
||||
condition.wait (lock, [&stopped = stopped, &pulling = pulling, &frontiers_confirmation_pending = frontiers_confirmation_pending] { return stopped || pulling == 0 || frontiers_confirmation_pending; });
|
||||
// clang-format on
|
||||
attempt_restart_check (lock);
|
||||
}
|
||||
// Flushing may resolve forks which can add more pulls
|
||||
node->logger.try_log ("Flushing unchecked blocks");
|
||||
lock.unlock ();
|
||||
node->block_processor.flush ();
|
||||
lock.lock ();
|
||||
node->logger.try_log ("Finished flushing unchecked blocks");
|
||||
}
|
||||
if (!stopped)
|
||||
{
|
||||
node->logger.try_log ("Completed legacy pulls");
|
||||
if (!node->flags.disable_bootstrap_bulk_push_client)
|
||||
{
|
||||
request_push (lock);
|
||||
}
|
||||
if (!stopped)
|
||||
{
|
||||
node->unchecked_cleanup ();
|
||||
}
|
||||
}
|
||||
lock.unlock ();
|
||||
stop ();
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_legacy::get_information (boost::property_tree::ptree & tree_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
tree_a.put ("frontier_pulls", std::to_string (frontier_pulls.size ()));
|
||||
tree_a.put ("frontiers_received", static_cast<bool> (frontiers_received));
|
||||
tree_a.put ("frontiers_confirmed", static_cast<bool> (frontiers_confirmed));
|
||||
tree_a.put ("frontiers_confirmation_pending", static_cast<bool> (frontiers_confirmation_pending));
|
||||
}
|
86
nano/node/bootstrap/bootstrap_attempt.hpp
Normal file
86
nano/node/bootstrap/bootstrap_attempt.hpp
Normal file
|
@ -0,0 +1,86 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_bulk_pull.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <future>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class node;
|
||||
|
||||
class frontier_req_client;
|
||||
class bulk_push_client;
|
||||
class bootstrap_attempt : public std::enable_shared_from_this<bootstrap_attempt>
|
||||
{
|
||||
public:
|
||||
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a, uint64_t incremental_id_a, std::string id_a);
|
||||
virtual ~bootstrap_attempt ();
|
||||
virtual void run () = 0;
|
||||
virtual void stop ();
|
||||
bool still_pulling ();
|
||||
bool should_log ();
|
||||
std::string mode_text ();
|
||||
virtual void restart_condition ();
|
||||
virtual void add_frontier (nano::pull_info const &);
|
||||
virtual void add_bulk_push_target (nano::block_hash const &, nano::block_hash const &);
|
||||
virtual bool request_bulk_push_target (std::pair<nano::block_hash, nano::block_hash> &);
|
||||
virtual void add_recent_pull (nano::block_hash const &);
|
||||
virtual void lazy_start (nano::hash_or_account const &, bool confirmed = true);
|
||||
virtual void lazy_add (nano::pull_info const &);
|
||||
virtual void lazy_requeue (nano::block_hash const &, nano::block_hash const &, bool);
|
||||
virtual uint32_t lazy_batch_size ();
|
||||
virtual bool lazy_has_expired () const;
|
||||
virtual bool lazy_processed_or_exists (nano::block_hash const &);
|
||||
virtual bool process_block (std::shared_ptr<nano::block>, nano::account const &, uint64_t, nano::bulk_pull::count_t, bool, unsigned);
|
||||
virtual void requeue_pending (nano::account const &);
|
||||
virtual void wallet_start (std::deque<nano::account> &);
|
||||
virtual size_t wallet_size ();
|
||||
virtual void get_information (boost::property_tree::ptree &) = 0;
|
||||
std::mutex next_log_mutex;
|
||||
std::chrono::steady_clock::time_point next_log{ std::chrono::steady_clock::now () };
|
||||
std::atomic<unsigned> pulling{ 0 };
|
||||
std::shared_ptr<nano::node> node;
|
||||
std::atomic<uint64_t> total_blocks{ 0 };
|
||||
std::atomic<unsigned> requeued_pulls{ 0 };
|
||||
std::atomic<bool> started{ false };
|
||||
std::atomic<bool> stopped{ false };
|
||||
uint64_t incremental_id{ 0 };
|
||||
std::string id;
|
||||
std::chrono::steady_clock::time_point attempt_start{ std::chrono::steady_clock::now () };
|
||||
std::atomic<bool> frontiers_received{ false };
|
||||
std::atomic<bool> frontiers_confirmed{ false };
|
||||
nano::bootstrap_mode mode;
|
||||
std::mutex mutex;
|
||||
nano::condition_variable condition;
|
||||
};
|
||||
class bootstrap_attempt_legacy : public bootstrap_attempt
|
||||
{
|
||||
public:
|
||||
explicit bootstrap_attempt_legacy (std::shared_ptr<nano::node> node_a, uint64_t incremental_id_a, std::string id_a = "");
|
||||
void run () override;
|
||||
bool consume_future (std::future<bool> &);
|
||||
void stop () override;
|
||||
bool request_frontier (nano::unique_lock<std::mutex> &, bool = false);
|
||||
void request_pull (nano::unique_lock<std::mutex> &);
|
||||
void request_push (nano::unique_lock<std::mutex> &);
|
||||
void add_frontier (nano::pull_info const &) override;
|
||||
void add_bulk_push_target (nano::block_hash const &, nano::block_hash const &) override;
|
||||
bool request_bulk_push_target (std::pair<nano::block_hash, nano::block_hash> &) override;
|
||||
void add_recent_pull (nano::block_hash const &) override;
|
||||
void run_start (nano::unique_lock<std::mutex> &);
|
||||
void restart_condition () override;
|
||||
void attempt_restart_check (nano::unique_lock<std::mutex> &);
|
||||
bool confirm_frontiers (nano::unique_lock<std::mutex> &);
|
||||
void get_information (boost::property_tree::ptree &) override;
|
||||
nano::tcp_endpoint endpoint_frontier_request;
|
||||
std::weak_ptr<nano::frontier_req_client> frontiers;
|
||||
std::weak_ptr<nano::bulk_push_client> push;
|
||||
std::deque<nano::pull_info> frontier_pulls;
|
||||
std::deque<nano::block_hash> recent_pulls_head;
|
||||
std::vector<std::pair<nano::block_hash, nano::block_hash>> bulk_push_targets;
|
||||
std::atomic<unsigned> account_count{ 0 };
|
||||
std::atomic<bool> frontiers_confirmation_pending{ false };
|
||||
};
|
||||
}
|
|
@ -1,28 +1,32 @@
|
|||
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_bulk_pull.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_connections.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
nano::pull_info::pull_info (nano::hash_or_account const & account_or_head_a, nano::block_hash const & head_a, nano::block_hash const & end_a, count_t count_a, unsigned retry_limit_a) :
|
||||
nano::pull_info::pull_info (nano::hash_or_account const & account_or_head_a, nano::block_hash const & head_a, nano::block_hash const & end_a, uint64_t bootstrap_id_a, count_t count_a, unsigned retry_limit_a) :
|
||||
account_or_head (account_or_head_a),
|
||||
head (head_a),
|
||||
head_original (head_a),
|
||||
end (end_a),
|
||||
count (count_a),
|
||||
retry_limit (retry_limit_a)
|
||||
retry_limit (retry_limit_a),
|
||||
bootstrap_id (bootstrap_id_a)
|
||||
{
|
||||
}
|
||||
|
||||
nano::bulk_pull_client::bulk_pull_client (std::shared_ptr<nano::bootstrap_client> connection_a, nano::pull_info const & pull_a) :
|
||||
nano::bulk_pull_client::bulk_pull_client (std::shared_ptr<nano::bootstrap_client> connection_a, std::shared_ptr<nano::bootstrap_attempt> attempt_a, nano::pull_info const & pull_a) :
|
||||
connection (connection_a),
|
||||
attempt (attempt_a),
|
||||
known_account (0),
|
||||
pull (pull_a),
|
||||
pull_blocks (0),
|
||||
unexpected_count (0)
|
||||
{
|
||||
connection->attempt->condition.notify_all ();
|
||||
attempt->condition.notify_all ();
|
||||
}
|
||||
|
||||
nano::bulk_pull_client::~bulk_pull_client ()
|
||||
|
@ -31,12 +35,12 @@ nano::bulk_pull_client::~bulk_pull_client ()
|
|||
if (expected != pull.end)
|
||||
{
|
||||
pull.head = expected;
|
||||
if (connection->attempt->mode != nano::bootstrap_mode::legacy)
|
||||
if (attempt->mode != nano::bootstrap_mode::legacy)
|
||||
{
|
||||
pull.account_or_head = expected;
|
||||
}
|
||||
pull.processed += pull_blocks - unexpected_count;
|
||||
connection->attempt->requeue_pull (pull, network_error);
|
||||
connection->node->bootstrap_initiator.connections->requeue_pull (pull, network_error);
|
||||
if (connection->node->config.logging.bulk_pull_logging ())
|
||||
{
|
||||
connection->node->logger.try_log (boost::str (boost::format ("Bulk pull end block is not expected %1% for account %2%") % pull.end.to_string () % pull.account_or_head.to_account ()));
|
||||
|
@ -46,11 +50,8 @@ nano::bulk_pull_client::~bulk_pull_client ()
|
|||
{
|
||||
connection->node->bootstrap_initiator.cache.remove (pull);
|
||||
}
|
||||
{
|
||||
nano::lock_guard<std::mutex> mutex (connection->attempt->mutex);
|
||||
--connection->attempt->pulling;
|
||||
}
|
||||
connection->attempt->condition.notify_all ();
|
||||
--attempt->pulling;
|
||||
attempt->condition.notify_all ();
|
||||
}
|
||||
|
||||
void nano::bulk_pull_client::request ()
|
||||
|
@ -74,13 +75,11 @@ void nano::bulk_pull_client::request ()
|
|||
|
||||
if (connection->node->config.logging.bulk_pull_logging ())
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (connection->attempt->mutex);
|
||||
connection->node->logger.try_log (boost::str (boost::format ("Requesting account %1% from %2%. %3% accounts in queue") % pull.account_or_head.to_account () % connection->channel->to_string () % connection->attempt->pulls.size ()));
|
||||
connection->node->logger.try_log (boost::str (boost::format ("Requesting account %1% from %2%. %3% accounts in queue") % pull.account_or_head.to_account () % connection->channel->to_string () % attempt->pulling));
|
||||
}
|
||||
else if (connection->node->config.logging.network_logging () && connection->attempt->should_log ())
|
||||
else if (connection->node->config.logging.network_logging () && attempt->should_log ())
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (connection->attempt->mutex);
|
||||
connection->node->logger.always_log (boost::str (boost::format ("%1% accounts in pull queue") % connection->attempt->pulls.size ()));
|
||||
connection->node->logger.always_log (boost::str (boost::format ("%1% accounts in pull queue") % attempt->pulling));
|
||||
}
|
||||
auto this_l (shared_from_this ());
|
||||
connection->channel->send (
|
||||
|
@ -104,7 +103,7 @@ void nano::bulk_pull_client::request ()
|
|||
void nano::bulk_pull_client::throttled_receive_block ()
|
||||
{
|
||||
debug_assert (!network_error);
|
||||
if (!connection->node->block_processor.half_full ())
|
||||
if (!connection->node->block_processor.half_full () && !connection->node->block_processor.flushing)
|
||||
{
|
||||
receive_block ();
|
||||
}
|
||||
|
@ -112,7 +111,7 @@ void nano::bulk_pull_client::throttled_receive_block ()
|
|||
{
|
||||
auto this_l (shared_from_this ());
|
||||
connection->node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l]() {
|
||||
if (!this_l->connection->pending_stop && !this_l->connection->attempt->stopped)
|
||||
if (!this_l->connection->pending_stop && !this_l->attempt->stopped)
|
||||
{
|
||||
this_l->throttled_receive_block ();
|
||||
}
|
||||
|
@ -192,7 +191,7 @@ void nano::bulk_pull_client::received_type ()
|
|||
// Avoid re-using slow peers, or peers that sent the wrong blocks.
|
||||
if (!connection->pending_stop && (expected == pull.end || (pull.count != 0 && pull.count == pull_blocks)))
|
||||
{
|
||||
connection->attempt->pool_connection (connection);
|
||||
connection->connections->pool_connection (connection);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -244,22 +243,22 @@ void nano::bulk_pull_client::received_block (boost::system::error_code const & e
|
|||
{
|
||||
connection->start_time = std::chrono::steady_clock::now ();
|
||||
}
|
||||
connection->attempt->total_blocks++;
|
||||
bool stop_pull (connection->attempt->process_block (block, known_account, pull_blocks, pull.count, block_expected, pull.retry_limit));
|
||||
attempt->total_blocks++;
|
||||
bool stop_pull (attempt->process_block (block, known_account, pull_blocks, pull.count, block_expected, pull.retry_limit));
|
||||
pull_blocks++;
|
||||
if (!stop_pull && !connection->hard_stop.load ())
|
||||
{
|
||||
/* Process block in lazy pull if not stopped
|
||||
Stop usual pull request with unexpected block & more than 16k blocks processed
|
||||
to prevent spam */
|
||||
if (connection->attempt->mode != nano::bootstrap_mode::legacy || unexpected_count < 16384)
|
||||
if (attempt->mode != nano::bootstrap_mode::legacy || unexpected_count < 16384)
|
||||
{
|
||||
throttled_receive_block ();
|
||||
}
|
||||
}
|
||||
else if (stop_pull && block_expected)
|
||||
{
|
||||
connection->attempt->pool_connection (connection);
|
||||
connection->connections->pool_connection (connection);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -282,21 +281,19 @@ void nano::bulk_pull_client::received_block (boost::system::error_code const & e
|
|||
}
|
||||
}
|
||||
|
||||
nano::bulk_pull_account_client::bulk_pull_account_client (std::shared_ptr<nano::bootstrap_client> connection_a, nano::account const & account_a) :
|
||||
nano::bulk_pull_account_client::bulk_pull_account_client (std::shared_ptr<nano::bootstrap_client> connection_a, std::shared_ptr<nano::bootstrap_attempt> attempt_a, nano::account const & account_a) :
|
||||
connection (connection_a),
|
||||
attempt (attempt_a),
|
||||
account (account_a),
|
||||
pull_blocks (0)
|
||||
{
|
||||
connection->attempt->condition.notify_all ();
|
||||
attempt->condition.notify_all ();
|
||||
}
|
||||
|
||||
nano::bulk_pull_account_client::~bulk_pull_account_client ()
|
||||
{
|
||||
{
|
||||
nano::lock_guard<std::mutex> mutex (connection->attempt->mutex);
|
||||
--connection->attempt->pulling;
|
||||
}
|
||||
connection->attempt->condition.notify_all ();
|
||||
--attempt->pulling;
|
||||
attempt->condition.notify_all ();
|
||||
}
|
||||
|
||||
void nano::bulk_pull_account_client::request ()
|
||||
|
@ -307,13 +304,11 @@ void nano::bulk_pull_account_client::request ()
|
|||
req.flags = nano::bulk_pull_account_flags::pending_hash_and_amount;
|
||||
if (connection->node->config.logging.bulk_pull_logging ())
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (connection->attempt->mutex);
|
||||
connection->node->logger.try_log (boost::str (boost::format ("Requesting pending for account %1% from %2%. %3% accounts in queue") % req.account.to_account () % connection->channel->to_string () % connection->attempt->wallet_accounts.size ()));
|
||||
connection->node->logger.try_log (boost::str (boost::format ("Requesting pending for account %1% from %2%. %3% accounts in queue") % req.account.to_account () % connection->channel->to_string () % attempt->wallet_size ()));
|
||||
}
|
||||
else if (connection->node->config.logging.network_logging () && connection->attempt->should_log ())
|
||||
else if (connection->node->config.logging.network_logging () && attempt->should_log ())
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (connection->attempt->mutex);
|
||||
connection->node->logger.always_log (boost::str (boost::format ("%1% accounts in pull queue") % connection->attempt->wallet_accounts.size ()));
|
||||
connection->node->logger.always_log (boost::str (boost::format ("%1% accounts in pull queue") % attempt->wallet_size ()));
|
||||
}
|
||||
auto this_l (shared_from_this ());
|
||||
connection->channel->send (
|
||||
|
@ -324,7 +319,7 @@ void nano::bulk_pull_account_client::request ()
|
|||
}
|
||||
else
|
||||
{
|
||||
this_l->connection->attempt->requeue_pending (this_l->account);
|
||||
this_l->attempt->requeue_pending (this_l->account);
|
||||
if (this_l->connection->node->config.logging.bulk_pull_logging ())
|
||||
{
|
||||
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error starting bulk pull request to %1%: to %2%") % ec.message () % this_l->connection->channel->to_string ()));
|
||||
|
@ -368,7 +363,7 @@ void nano::bulk_pull_account_client::receive_pending ()
|
|||
{
|
||||
if (!this_l->connection->node->ledger.block_exists (pending))
|
||||
{
|
||||
this_l->connection->attempt->lazy_start (pending);
|
||||
this_l->connection->node->bootstrap_initiator.bootstrap_lazy (pending, false, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -376,17 +371,17 @@ void nano::bulk_pull_account_client::receive_pending ()
|
|||
}
|
||||
else
|
||||
{
|
||||
this_l->connection->attempt->requeue_pending (this_l->account);
|
||||
this_l->attempt->requeue_pending (this_l->account);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
this_l->connection->attempt->pool_connection (this_l->connection);
|
||||
this_l->connection->connections->pool_connection (this_l->connection);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
this_l->connection->attempt->requeue_pending (this_l->account);
|
||||
this_l->attempt->requeue_pending (this_l->account);
|
||||
if (this_l->connection->node->config.logging.network_logging ())
|
||||
{
|
||||
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while receiving bulk pull account frontier %1%") % ec.message ()));
|
||||
|
@ -395,7 +390,7 @@ void nano::bulk_pull_account_client::receive_pending ()
|
|||
}
|
||||
else
|
||||
{
|
||||
this_l->connection->attempt->requeue_pending (this_l->account);
|
||||
this_l->attempt->requeue_pending (this_l->account);
|
||||
if (this_l->connection->node->config.logging.network_message_logging ())
|
||||
{
|
||||
this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % size_l % size_a));
|
||||
|
|
|
@ -7,12 +7,13 @@
|
|||
|
||||
namespace nano
|
||||
{
|
||||
class bootstrap_attempt;
|
||||
class pull_info
|
||||
{
|
||||
public:
|
||||
using count_t = nano::bulk_pull::count_t;
|
||||
pull_info () = default;
|
||||
pull_info (nano::hash_or_account const &, nano::block_hash const &, nano::block_hash const &, count_t = 0, unsigned = 16);
|
||||
pull_info (nano::hash_or_account const &, nano::block_hash const &, nano::block_hash const &, uint64_t, count_t = 0, unsigned = 16);
|
||||
nano::hash_or_account account_or_head{ 0 };
|
||||
nano::block_hash head{ 0 };
|
||||
nano::block_hash head_original{ 0 };
|
||||
|
@ -21,12 +22,13 @@ public:
|
|||
unsigned attempts{ 0 };
|
||||
uint64_t processed{ 0 };
|
||||
unsigned retry_limit{ 0 };
|
||||
uint64_t bootstrap_id{ 0 };
|
||||
};
|
||||
class bootstrap_client;
|
||||
class bulk_pull_client final : public std::enable_shared_from_this<nano::bulk_pull_client>
|
||||
{
|
||||
public:
|
||||
bulk_pull_client (std::shared_ptr<nano::bootstrap_client>, nano::pull_info const &);
|
||||
bulk_pull_client (std::shared_ptr<nano::bootstrap_client>, std::shared_ptr<nano::bootstrap_attempt>, nano::pull_info const &);
|
||||
~bulk_pull_client ();
|
||||
void request ();
|
||||
void receive_block ();
|
||||
|
@ -35,6 +37,7 @@ public:
|
|||
void received_block (boost::system::error_code const &, size_t, nano::block_type);
|
||||
nano::block_hash first ();
|
||||
std::shared_ptr<nano::bootstrap_client> connection;
|
||||
std::shared_ptr<nano::bootstrap_attempt> attempt;
|
||||
nano::block_hash expected;
|
||||
nano::account known_account;
|
||||
nano::pull_info pull;
|
||||
|
@ -45,11 +48,12 @@ public:
|
|||
class bulk_pull_account_client final : public std::enable_shared_from_this<nano::bulk_pull_account_client>
|
||||
{
|
||||
public:
|
||||
bulk_pull_account_client (std::shared_ptr<nano::bootstrap_client>, nano::account const &);
|
||||
bulk_pull_account_client (std::shared_ptr<nano::bootstrap_client>, std::shared_ptr<nano::bootstrap_attempt>, nano::account const &);
|
||||
~bulk_pull_account_client ();
|
||||
void request ();
|
||||
void receive_pending ();
|
||||
std::shared_ptr<nano::bootstrap_client> connection;
|
||||
std::shared_ptr<nano::bootstrap_attempt> attempt;
|
||||
nano::account account;
|
||||
uint64_t pull_blocks;
|
||||
};
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_bulk_push.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
nano::bulk_push_client::bulk_push_client (std::shared_ptr<nano::bootstrap_client> const & connection_a) :
|
||||
connection (connection_a)
|
||||
nano::bulk_push_client::bulk_push_client (std::shared_ptr<nano::bootstrap_client> const & connection_a, std::shared_ptr<nano::bootstrap_attempt> const & attempt_a) :
|
||||
connection (connection_a),
|
||||
attempt (attempt_a)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -43,16 +44,7 @@ void nano::bulk_push_client::push ()
|
|||
{
|
||||
if (current_target.first.is_zero () || current_target.first == current_target.second)
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (connection->attempt->mutex);
|
||||
if (!connection->attempt->bulk_push_targets.empty ())
|
||||
{
|
||||
current_target = connection->attempt->bulk_push_targets.back ();
|
||||
connection->attempt->bulk_push_targets.pop_back ();
|
||||
}
|
||||
else
|
||||
{
|
||||
finished = true;
|
||||
}
|
||||
finished = attempt->request_bulk_push_target (current_target);
|
||||
}
|
||||
if (!finished)
|
||||
{
|
||||
|
|
|
@ -6,17 +6,19 @@
|
|||
|
||||
namespace nano
|
||||
{
|
||||
class bootstrap_attempt;
|
||||
class bootstrap_client;
|
||||
class bulk_push_client final : public std::enable_shared_from_this<nano::bulk_push_client>
|
||||
{
|
||||
public:
|
||||
explicit bulk_push_client (std::shared_ptr<nano::bootstrap_client> const &);
|
||||
explicit bulk_push_client (std::shared_ptr<nano::bootstrap_client> const &, std::shared_ptr<nano::bootstrap_attempt> const &);
|
||||
~bulk_push_client ();
|
||||
void start ();
|
||||
void push ();
|
||||
void push_block (nano::block const &);
|
||||
void send_finished ();
|
||||
std::shared_ptr<nano::bootstrap_client> connection;
|
||||
std::shared_ptr<nano::bootstrap_attempt> attempt;
|
||||
std::promise<bool> promise;
|
||||
std::pair<nano::block_hash, nano::block_hash> current_target;
|
||||
};
|
||||
|
|
492
nano/node/bootstrap/bootstrap_connections.cpp
Normal file
492
nano/node/bootstrap/bootstrap_connections.cpp
Normal file
|
@ -0,0 +1,492 @@
|
|||
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_connections.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
constexpr double nano::bootstrap_limits::bootstrap_connection_scale_target_blocks;
|
||||
constexpr double nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec;
|
||||
constexpr double nano::bootstrap_limits::bootstrap_minimum_termination_time_sec;
|
||||
constexpr unsigned nano::bootstrap_limits::bootstrap_max_new_connections;
|
||||
constexpr unsigned nano::bootstrap_limits::requeued_pulls_processed_blocks_factor;
|
||||
|
||||
nano::bootstrap_client::bootstrap_client (std::shared_ptr<nano::node> node_a, std::shared_ptr<nano::bootstrap_connections> connections_a, std::shared_ptr<nano::transport::channel_tcp> channel_a, std::shared_ptr<nano::socket> socket_a) :
|
||||
node (node_a),
|
||||
connections (connections_a),
|
||||
channel (channel_a),
|
||||
socket (socket_a),
|
||||
receive_buffer (std::make_shared<std::vector<uint8_t>> ()),
|
||||
start_time (std::chrono::steady_clock::now ())
|
||||
{
|
||||
++connections->connections_count;
|
||||
receive_buffer->resize (256);
|
||||
}
|
||||
|
||||
nano::bootstrap_client::~bootstrap_client ()
|
||||
{
|
||||
--connections->connections_count;
|
||||
}
|
||||
|
||||
double nano::bootstrap_client::block_rate () const
|
||||
{
|
||||
auto elapsed = std::max (elapsed_seconds (), nano::bootstrap_limits::bootstrap_minimum_elapsed_seconds_blockrate);
|
||||
return static_cast<double> (block_count.load () / elapsed);
|
||||
}
|
||||
|
||||
double nano::bootstrap_client::elapsed_seconds () const
|
||||
{
|
||||
return std::chrono::duration_cast<std::chrono::duration<double>> (std::chrono::steady_clock::now () - start_time).count ();
|
||||
}
|
||||
|
||||
void nano::bootstrap_client::stop (bool force)
|
||||
{
|
||||
pending_stop = true;
|
||||
if (force)
|
||||
{
|
||||
hard_stop = true;
|
||||
}
|
||||
}
|
||||
|
||||
nano::bootstrap_connections::bootstrap_connections (nano::node & node_a) :
|
||||
node (node_a)
|
||||
{
|
||||
}
|
||||
|
||||
std::shared_ptr<nano::bootstrap_client> nano::bootstrap_connections::connection (std::shared_ptr<nano::bootstrap_attempt> attempt_a, bool use_front_connection)
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
condition.wait (lock, [& stopped = stopped, &idle = idle, &new_connections_empty = new_connections_empty] { return stopped || !idle.empty () || new_connections_empty; });
|
||||
std::shared_ptr<nano::bootstrap_client> result;
|
||||
if (!stopped && !idle.empty ())
|
||||
{
|
||||
if (!use_front_connection)
|
||||
{
|
||||
result = idle.back ();
|
||||
idle.pop_back ();
|
||||
}
|
||||
else
|
||||
{
|
||||
result = idle.front ();
|
||||
idle.pop_front ();
|
||||
}
|
||||
}
|
||||
if (result == nullptr && connections_count == 0 && new_connections_empty && attempt_a != nullptr)
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Bootstrap attempt stopped because there are no peers")));
|
||||
attempt_a->stopped = true;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::pool_connection (std::shared_ptr<nano::bootstrap_client> client_a, bool new_client, bool push_front)
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
if (!stopped && !client_a->pending_stop && !node.bootstrap_initiator.excluded_peers.check (client_a->channel->get_tcp_endpoint ()))
|
||||
{
|
||||
// Idle bootstrap client socket
|
||||
if (auto socket_l = client_a->channel->socket.lock ())
|
||||
{
|
||||
socket_l->start_timer (node.network_params.node.idle_timeout);
|
||||
// Push into idle deque
|
||||
if (!push_front)
|
||||
{
|
||||
idle.push_back (client_a);
|
||||
}
|
||||
else
|
||||
{
|
||||
idle.push_front (client_a);
|
||||
}
|
||||
if (new_client)
|
||||
{
|
||||
clients.push_back (client_a);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (auto socket_l = client_a->channel->socket.lock ())
|
||||
{
|
||||
socket_l->close ();
|
||||
}
|
||||
}
|
||||
lock.unlock ();
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::add_connection (nano::endpoint const & endpoint_a)
|
||||
{
|
||||
connect_client (nano::tcp_endpoint (endpoint_a.address (), endpoint_a.port ()), true);
|
||||
}
|
||||
|
||||
std::shared_ptr<nano::bootstrap_client> nano::bootstrap_connections::find_connection (nano::tcp_endpoint const & endpoint_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
std::shared_ptr<nano::bootstrap_client> result;
|
||||
for (auto i (idle.begin ()), end (idle.end ()); i != end && !stopped; ++i)
|
||||
{
|
||||
if ((*i)->channel->get_tcp_endpoint () == endpoint_a)
|
||||
{
|
||||
result = *i;
|
||||
idle.erase (i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::connect_client (nano::tcp_endpoint const & endpoint_a, bool push_front)
|
||||
{
|
||||
++connections_count;
|
||||
auto socket (std::make_shared<nano::socket> (node.shared ()));
|
||||
auto this_l (shared_from_this ());
|
||||
socket->async_connect (endpoint_a,
|
||||
[this_l, socket, endpoint_a, push_front](boost::system::error_code const & ec) {
|
||||
if (!ec)
|
||||
{
|
||||
if (this_l->node.config.logging.bulk_pull_logging ())
|
||||
{
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Connection established to %1%") % endpoint_a));
|
||||
}
|
||||
auto client (std::make_shared<nano::bootstrap_client> (this_l->node.shared (), this_l, std::make_shared<nano::transport::channel_tcp> (*this_l->node.shared (), socket), socket));
|
||||
this_l->pool_connection (client, true, push_front);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (this_l->node.config.logging.network_logging ())
|
||||
{
|
||||
switch (ec.value ())
|
||||
{
|
||||
default:
|
||||
this_l->node.logger.try_log (boost::str (boost::format ("Error initiating bootstrap connection to %1%: %2%") % endpoint_a % ec.message ()));
|
||||
break;
|
||||
case boost::system::errc::connection_refused:
|
||||
case boost::system::errc::operation_canceled:
|
||||
case boost::system::errc::timed_out:
|
||||
case 995: //Windows The I/O operation has been aborted because of either a thread exit or an application request
|
||||
case 10061: //Windows No connection could be made because the target machine actively refused it
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
--this_l->connections_count;
|
||||
});
|
||||
}
|
||||
|
||||
unsigned nano::bootstrap_connections::target_connections (size_t pulls_remaining, size_t attempts_count)
|
||||
{
|
||||
unsigned attempts_factor = node.config.bootstrap_connections * attempts_count;
|
||||
if (attempts_factor >= node.config.bootstrap_connections_max)
|
||||
{
|
||||
return std::max (1U, node.config.bootstrap_connections_max);
|
||||
}
|
||||
|
||||
// Only scale up to bootstrap_connections_max for large pulls.
|
||||
double step_scale = std::min (1.0, std::max (0.0, (double)pulls_remaining / nano::bootstrap_limits::bootstrap_connection_scale_target_blocks));
|
||||
double target = (double)attempts_factor + (double)(node.config.bootstrap_connections_max - attempts_factor) * step_scale;
|
||||
return std::max (1U, (unsigned)(target + 0.5f));
|
||||
}
|
||||
|
||||
struct block_rate_cmp
|
||||
{
|
||||
bool operator() (const std::shared_ptr<nano::bootstrap_client> & lhs, const std::shared_ptr<nano::bootstrap_client> & rhs) const
|
||||
{
|
||||
return lhs->block_rate () > rhs->block_rate ();
|
||||
}
|
||||
};
|
||||
|
||||
void nano::bootstrap_connections::populate_connections (bool repeat)
|
||||
{
|
||||
double rate_sum = 0.0;
|
||||
size_t num_pulls = 0;
|
||||
size_t attempts_count = node.bootstrap_initiator.attempts.size ();
|
||||
std::priority_queue<std::shared_ptr<nano::bootstrap_client>, std::vector<std::shared_ptr<nano::bootstrap_client>>, block_rate_cmp> sorted_connections;
|
||||
std::unordered_set<nano::tcp_endpoint> endpoints;
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
num_pulls = pulls.size ();
|
||||
std::deque<std::weak_ptr<nano::bootstrap_client>> new_clients;
|
||||
for (auto & c : clients)
|
||||
{
|
||||
if (auto client = c.lock ())
|
||||
{
|
||||
if (auto socket_l = client->channel->socket.lock ())
|
||||
{
|
||||
new_clients.push_back (client);
|
||||
endpoints.insert (socket_l->remote_endpoint ());
|
||||
double elapsed_sec = client->elapsed_seconds ();
|
||||
auto blocks_per_sec = client->block_rate ();
|
||||
rate_sum += blocks_per_sec;
|
||||
if (client->elapsed_seconds () > nano::bootstrap_limits::bootstrap_connection_warmup_time_sec && client->block_count > 0)
|
||||
{
|
||||
sorted_connections.push (client);
|
||||
}
|
||||
// Force-stop the slowest peers, since they can take the whole bootstrap hostage by dribbling out blocks on the last remaining pull.
|
||||
// This is ~1.5kilobits/sec.
|
||||
if (elapsed_sec > nano::bootstrap_limits::bootstrap_minimum_termination_time_sec && blocks_per_sec < nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec)
|
||||
{
|
||||
if (node.config.logging.bulk_pull_logging ())
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Stopping slow peer %1% (elapsed sec %2%s > %3%s and %4% blocks per second < %5%)") % client->channel->to_string () % elapsed_sec % nano::bootstrap_limits::bootstrap_minimum_termination_time_sec % blocks_per_sec % nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec));
|
||||
}
|
||||
|
||||
client->stop (true);
|
||||
new_clients.pop_back ();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Cleanup expired clients
|
||||
clients.swap (new_clients);
|
||||
}
|
||||
|
||||
auto target = target_connections (num_pulls, attempts_count);
|
||||
|
||||
// We only want to drop slow peers when more than 2/3 are active. 2/3 because 1/2 is too aggressive, and 100% rarely happens.
|
||||
// Probably needs more tuning.
|
||||
if (sorted_connections.size () >= (target * 2) / 3 && target >= 4)
|
||||
{
|
||||
// 4 -> 1, 8 -> 2, 16 -> 4, arbitrary, but seems to work well.
|
||||
auto drop = (int)roundf (sqrtf ((float)target - 2.0f));
|
||||
|
||||
if (node.config.logging.bulk_pull_logging ())
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Dropping %1% bulk pull peers, target connections %2%") % drop % target));
|
||||
}
|
||||
|
||||
for (int i = 0; i < drop; i++)
|
||||
{
|
||||
auto client = sorted_connections.top ();
|
||||
|
||||
if (node.config.logging.bulk_pull_logging ())
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Dropping peer with block rate %1%, block count %2% (%3%) ") % client->block_rate () % client->block_count % client->channel->to_string ()));
|
||||
}
|
||||
|
||||
client->stop (false);
|
||||
sorted_connections.pop ();
|
||||
}
|
||||
}
|
||||
|
||||
if (node.config.logging.bulk_pull_logging ())
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
node.logger.try_log (boost::str (boost::format ("Bulk pull connections: %1%, rate: %2% blocks/sec, bootstrap attempts %3%, remaining pulls: %4%") % connections_count.load () % (int)rate_sum % attempts_count % num_pulls));
|
||||
}
|
||||
|
||||
if (connections_count < target && (attempts_count != 0 || new_connections_empty) && !stopped)
|
||||
{
|
||||
auto delta = std::min ((target - connections_count) * 2, nano::bootstrap_limits::bootstrap_max_new_connections);
|
||||
// TODO - tune this better
|
||||
// Not many peers respond, need to try to make more connections than we need.
|
||||
for (auto i = 0u; i < delta; i++)
|
||||
{
|
||||
auto endpoint (node.network.bootstrap_peer (true));
|
||||
if (endpoint != nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0) && endpoints.find (endpoint) == endpoints.end () && !node.bootstrap_initiator.excluded_peers.check (endpoint))
|
||||
{
|
||||
connect_client (endpoint);
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
endpoints.insert (endpoint);
|
||||
new_connections_empty = false;
|
||||
}
|
||||
else if (connections_count == 0)
|
||||
{
|
||||
new_connections_empty = true;
|
||||
condition.notify_all ();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!stopped && repeat)
|
||||
{
|
||||
std::weak_ptr<nano::bootstrap_connections> this_w (shared_from_this ());
|
||||
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_w]() {
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
this_l->populate_connections ();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::start_populate_connections ()
|
||||
{
|
||||
if (!populate_connections_started.exchange (true))
|
||||
{
|
||||
populate_connections ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::add_pull (nano::pull_info const & pull_a)
|
||||
{
|
||||
nano::pull_info pull (pull_a);
|
||||
node.bootstrap_initiator.cache.update_pull (pull);
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
pulls.push_back (pull);
|
||||
}
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::request_pull (nano::unique_lock<std::mutex> & lock_a)
|
||||
{
|
||||
lock_a.unlock ();
|
||||
auto connection_l (connection ());
|
||||
lock_a.lock ();
|
||||
if (connection_l != nullptr && !pulls.empty ())
|
||||
{
|
||||
std::shared_ptr<nano::bootstrap_attempt> attempt_l;
|
||||
nano::pull_info pull;
|
||||
// Search pulls with existing attempts
|
||||
while (attempt_l == nullptr && !pulls.empty ())
|
||||
{
|
||||
pull = pulls.front ();
|
||||
pulls.pop_front ();
|
||||
attempt_l = node.bootstrap_initiator.attempts.find (pull.bootstrap_id);
|
||||
// Check if lazy pull is obsolete (head was processed or head is 0 for destinations requests)
|
||||
if (attempt_l != nullptr && attempt_l->mode == nano::bootstrap_mode::lazy && !pull.head.is_zero () && attempt_l->lazy_processed_or_exists (pull.head))
|
||||
{
|
||||
--attempt_l->pulling;
|
||||
attempt_l->condition.notify_all ();
|
||||
attempt_l = nullptr;
|
||||
}
|
||||
}
|
||||
if (attempt_l != nullptr)
|
||||
{
|
||||
if (attempt_l->mode == nano::bootstrap_mode::legacy)
|
||||
{
|
||||
attempt_l->add_recent_pull (pull.head);
|
||||
}
|
||||
// The bulk_pull_client destructor attempt to requeue_pull which can cause a deadlock if this is the last reference
|
||||
// Dispatch request in an external thread in case it needs to be destroyed
|
||||
node.background ([connection_l, attempt_l, pull]() {
|
||||
auto client (std::make_shared<nano::bulk_pull_client> (connection_l, attempt_l, pull));
|
||||
client->request ();
|
||||
});
|
||||
}
|
||||
}
|
||||
else if (connection_l != nullptr)
|
||||
{
|
||||
// Reuse connection if pulls deque become empty
|
||||
lock_a.unlock ();
|
||||
pool_connection (connection_l);
|
||||
lock_a.lock ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::requeue_pull (nano::pull_info const & pull_a, bool network_error)
|
||||
{
|
||||
auto pull (pull_a);
|
||||
if (!network_error)
|
||||
{
|
||||
++pull.attempts;
|
||||
}
|
||||
auto attempt_l (node.bootstrap_initiator.attempts.find (pull.bootstrap_id));
|
||||
if (attempt_l != nullptr)
|
||||
{
|
||||
++attempt_l->requeued_pulls;
|
||||
if (attempt_l->mode == nano::bootstrap_mode::legacy)
|
||||
{
|
||||
attempt_l->restart_condition ();
|
||||
}
|
||||
else if (attempt_l->mode == nano::bootstrap_mode::lazy)
|
||||
{
|
||||
pull.count = attempt_l->lazy_batch_size ();
|
||||
}
|
||||
if (pull.attempts < pull.retry_limit + (pull.processed / nano::bootstrap_limits::requeued_pulls_processed_blocks_factor))
|
||||
{
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
pulls.push_front (pull);
|
||||
}
|
||||
++attempt_l->pulling;
|
||||
attempt_l->condition.notify_all ();
|
||||
condition.notify_all ();
|
||||
}
|
||||
else if (attempt_l->mode == nano::bootstrap_mode::lazy && (pull.retry_limit == std::numeric_limits<unsigned>::max () || pull.attempts <= pull.retry_limit + (pull.processed / node.network_params.bootstrap.lazy_max_pull_blocks)))
|
||||
{
|
||||
debug_assert (pull.account_or_head == pull.head);
|
||||
if (!attempt_l->lazy_processed_or_exists (pull.account_or_head))
|
||||
{
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
pulls.push_back (pull);
|
||||
}
|
||||
++attempt_l->pulling;
|
||||
attempt_l->condition.notify_all ();
|
||||
condition.notify_all ();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (node.config.logging.bulk_pull_logging ())
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Failed to pull account %1% down to %2% after %3% attempts and %4% blocks processed") % pull.account_or_head.to_account () % pull.end.to_string () % pull.attempts % pull.processed));
|
||||
}
|
||||
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_failed_account, nano::stat::dir::in);
|
||||
|
||||
if (attempt_l->mode == nano::bootstrap_mode::lazy && pull.processed > 0)
|
||||
{
|
||||
attempt_l->lazy_add (pull);
|
||||
}
|
||||
else if (attempt_l->mode == nano::bootstrap_mode::legacy)
|
||||
{
|
||||
node.bootstrap_initiator.cache.add (pull);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::clear_pulls (uint64_t bootstrap_id_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
auto i (pulls.begin ());
|
||||
while (i != pulls.end ())
|
||||
{
|
||||
if (i->bootstrap_id == bootstrap_id_a)
|
||||
{
|
||||
i = pulls.erase (i);
|
||||
}
|
||||
else
|
||||
{
|
||||
++i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::run ()
|
||||
{
|
||||
start_populate_connections ();
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
while (!stopped)
|
||||
{
|
||||
if (!pulls.empty ())
|
||||
{
|
||||
request_pull (lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
condition.wait (lock);
|
||||
}
|
||||
}
|
||||
stopped = true;
|
||||
lock.unlock ();
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
void nano::bootstrap_connections::stop ()
|
||||
{
|
||||
stopped = true;
|
||||
condition.notify_all ();
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
for (auto i : clients)
|
||||
{
|
||||
if (auto client = i.lock ())
|
||||
{
|
||||
client->socket->close ();
|
||||
}
|
||||
}
|
||||
clients.clear ();
|
||||
idle.clear ();
|
||||
}
|
70
nano/node/bootstrap/bootstrap_connections.hpp
Normal file
70
nano/node/bootstrap/bootstrap_connections.hpp
Normal file
|
@ -0,0 +1,70 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/socket.hpp>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class node;
|
||||
namespace transport
|
||||
{
|
||||
class channel_tcp;
|
||||
}
|
||||
|
||||
class bootstrap_attempt;
|
||||
class bootstrap_connections;
|
||||
class frontier_req_client;
|
||||
class pull_info;
|
||||
class bootstrap_client final : public std::enable_shared_from_this<bootstrap_client>
|
||||
{
|
||||
public:
|
||||
bootstrap_client (std::shared_ptr<nano::node> node_a, std::shared_ptr<nano::bootstrap_connections> connections_a, std::shared_ptr<nano::transport::channel_tcp> channel_a, std::shared_ptr<nano::socket> socket_a);
|
||||
~bootstrap_client ();
|
||||
std::shared_ptr<nano::bootstrap_client> shared ();
|
||||
void stop (bool force);
|
||||
double block_rate () const;
|
||||
double elapsed_seconds () const;
|
||||
std::shared_ptr<nano::node> node;
|
||||
std::shared_ptr<nano::bootstrap_connections> connections;
|
||||
std::shared_ptr<nano::transport::channel_tcp> channel;
|
||||
std::shared_ptr<nano::socket> socket;
|
||||
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
|
||||
std::chrono::steady_clock::time_point start_time;
|
||||
std::atomic<uint64_t> block_count{ 0 };
|
||||
std::atomic<bool> pending_stop{ false };
|
||||
std::atomic<bool> hard_stop{ false };
|
||||
};
|
||||
|
||||
class bootstrap_connections final : public std::enable_shared_from_this<bootstrap_connections>
|
||||
{
|
||||
public:
|
||||
bootstrap_connections (nano::node & node_a);
|
||||
std::shared_ptr<nano::bootstrap_connections> shared ();
|
||||
std::shared_ptr<nano::bootstrap_client> connection (std::shared_ptr<nano::bootstrap_attempt> attempt_a = nullptr, bool use_front_connection = false);
|
||||
void pool_connection (std::shared_ptr<nano::bootstrap_client> client_a, bool new_client = false, bool push_front = false);
|
||||
void add_connection (nano::endpoint const & endpoint_a);
|
||||
std::shared_ptr<nano::bootstrap_client> find_connection (nano::tcp_endpoint const & endpoint_a);
|
||||
void connect_client (nano::tcp_endpoint const & endpoint_a, bool push_front = false);
|
||||
unsigned target_connections (size_t pulls_remaining, size_t attempts_count);
|
||||
void populate_connections (bool repeat = true);
|
||||
void start_populate_connections ();
|
||||
void add_pull (nano::pull_info const & pull_a);
|
||||
void request_pull (nano::unique_lock<std::mutex> & lock_a);
|
||||
void requeue_pull (nano::pull_info const & pull_a, bool network_error = false);
|
||||
void clear_pulls (uint64_t);
|
||||
void run ();
|
||||
void stop ();
|
||||
std::deque<std::weak_ptr<nano::bootstrap_client>> clients;
|
||||
std::atomic<unsigned> connections_count{ 0 };
|
||||
nano::node & node;
|
||||
std::deque<std::shared_ptr<nano::bootstrap_client>> idle;
|
||||
std::deque<nano::pull_info> pulls;
|
||||
std::atomic<bool> populate_connections_started{ false };
|
||||
std::atomic<bool> new_connections_empty{ false };
|
||||
std::atomic<bool> stopped{ false };
|
||||
std::mutex mutex;
|
||||
nano::condition_variable condition;
|
||||
};
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_frontier.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
|
@ -36,8 +36,9 @@ void nano::frontier_req_client::run ()
|
|||
nano::buffer_drop_policy::no_limiter_drop);
|
||||
}
|
||||
|
||||
nano::frontier_req_client::frontier_req_client (std::shared_ptr<nano::bootstrap_client> connection_a) :
|
||||
nano::frontier_req_client::frontier_req_client (std::shared_ptr<nano::bootstrap_client> connection_a, std::shared_ptr<nano::bootstrap_attempt> attempt_a) :
|
||||
connection (connection_a),
|
||||
attempt (attempt_a),
|
||||
current (0),
|
||||
count (0),
|
||||
bulk_push_cost (0)
|
||||
|
@ -76,7 +77,7 @@ void nano::frontier_req_client::unsynced (nano::block_hash const & head, nano::b
|
|||
{
|
||||
if (bulk_push_cost < nano::bootstrap_limits::bulk_push_cost_limit)
|
||||
{
|
||||
connection->attempt->add_bulk_push_target (head, end);
|
||||
attempt->add_bulk_push_target (head, end);
|
||||
if (end.is_zero ())
|
||||
{
|
||||
bulk_push_cost += 2;
|
||||
|
@ -118,7 +119,7 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
|
|||
promise.set_value (true);
|
||||
return;
|
||||
}
|
||||
if (connection->attempt->should_log ())
|
||||
if (attempt->should_log ())
|
||||
{
|
||||
connection->node->logger.always_log (boost::str (boost::format ("Received %1% frontiers from %2%") % std::to_string (count) % connection->channel->to_string ()));
|
||||
}
|
||||
|
@ -147,7 +148,7 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
|
|||
}
|
||||
else
|
||||
{
|
||||
connection->attempt->add_pull (nano::pull_info (account, latest, frontier, 0, connection->node->network_params.bootstrap.frontier_retry_limit));
|
||||
attempt->add_frontier (nano::pull_info (account, latest, frontier, attempt->incremental_id, 0, connection->node->network_params.bootstrap.frontier_retry_limit));
|
||||
// Either we're behind or there's a fork we differ on
|
||||
// Either way, bulk pushing will probably not be effective
|
||||
bulk_push_cost += 5;
|
||||
|
@ -158,12 +159,12 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
|
|||
else
|
||||
{
|
||||
debug_assert (account < current);
|
||||
connection->attempt->add_pull (nano::pull_info (account, latest, nano::block_hash (0), 0, connection->node->network_params.bootstrap.frontier_retry_limit));
|
||||
attempt->add_frontier (nano::pull_info (account, latest, nano::block_hash (0), attempt->incremental_id, 0, connection->node->network_params.bootstrap.frontier_retry_limit));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
connection->attempt->add_pull (nano::pull_info (account, latest, nano::block_hash (0), 0, connection->node->network_params.bootstrap.frontier_retry_limit));
|
||||
attempt->add_frontier (nano::pull_info (account, latest, nano::block_hash (0), attempt->incremental_id, 0, connection->node->network_params.bootstrap.frontier_retry_limit));
|
||||
}
|
||||
receive_frontier ();
|
||||
}
|
||||
|
@ -187,7 +188,7 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
|
|||
catch (std::future_error &)
|
||||
{
|
||||
}
|
||||
connection->attempt->pool_connection (connection);
|
||||
connection->connections->pool_connection (connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,11 +7,12 @@
|
|||
|
||||
namespace nano
|
||||
{
|
||||
class bootstrap_attempt;
|
||||
class bootstrap_client;
|
||||
class frontier_req_client final : public std::enable_shared_from_this<nano::frontier_req_client>
|
||||
{
|
||||
public:
|
||||
explicit frontier_req_client (std::shared_ptr<nano::bootstrap_client>);
|
||||
explicit frontier_req_client (std::shared_ptr<nano::bootstrap_client>, std::shared_ptr<nano::bootstrap_attempt>);
|
||||
~frontier_req_client ();
|
||||
void run ();
|
||||
void receive_frontier ();
|
||||
|
@ -19,6 +20,7 @@ public:
|
|||
void unsynced (nano::block_hash const &, nano::block_hash const &);
|
||||
void next ();
|
||||
std::shared_ptr<nano::bootstrap_client> connection;
|
||||
std::shared_ptr<nano::bootstrap_attempt> attempt;
|
||||
nano::account current;
|
||||
nano::block_hash frontier;
|
||||
unsigned count;
|
||||
|
|
603
nano/node/bootstrap/bootstrap_lazy.cpp
Normal file
603
nano/node/bootstrap/bootstrap_lazy.cpp
Normal file
|
@ -0,0 +1,603 @@
|
|||
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/transport/tcp.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
constexpr std::chrono::seconds nano::bootstrap_limits::lazy_flush_delay_sec;
|
||||
constexpr unsigned nano::bootstrap_limits::lazy_destinations_request_limit;
|
||||
constexpr uint64_t nano::bootstrap_limits::lazy_batch_pull_count_resize_blocks_limit;
|
||||
constexpr double nano::bootstrap_limits::lazy_batch_pull_count_resize_ratio;
|
||||
constexpr size_t nano::bootstrap_limits::lazy_blocks_restart_limit;
|
||||
|
||||
nano::bootstrap_attempt_lazy::bootstrap_attempt_lazy (std::shared_ptr<nano::node> node_a, uint64_t incremental_id_a, std::string id_a) :
|
||||
nano::bootstrap_attempt (node_a, nano::bootstrap_mode::lazy, incremental_id_a, id_a)
|
||||
{
|
||||
node->bootstrap_initiator.notify_listeners (true);
|
||||
}
|
||||
|
||||
nano::bootstrap_attempt_lazy::~bootstrap_attempt_lazy ()
|
||||
{
|
||||
debug_assert (lazy_blocks.size () == lazy_blocks_count);
|
||||
node->bootstrap_initiator.notify_listeners (false);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_start (nano::hash_or_account const & hash_or_account_a, bool confirmed)
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
// Add start blocks, limit 1024 (4k with disabled legacy bootstrap)
|
||||
size_t max_keys (node->flags.disable_legacy_bootstrap ? 4 * 1024 : 1024);
|
||||
if (lazy_keys.size () < max_keys && lazy_keys.find (hash_or_account_a) == lazy_keys.end () && !lazy_blocks_processed (hash_or_account_a))
|
||||
{
|
||||
lazy_keys.insert (hash_or_account_a);
|
||||
lazy_pulls.emplace_back (hash_or_account_a, confirmed ? std::numeric_limits<unsigned>::max () : node->network_params.bootstrap.lazy_retry_limit);
|
||||
lock.unlock ();
|
||||
condition.notify_all ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_add (nano::hash_or_account const & hash_or_account_a, unsigned retry_limit)
|
||||
{
|
||||
// Add only unknown blocks
|
||||
debug_assert (!mutex.try_lock ());
|
||||
if (!lazy_blocks_processed (hash_or_account_a))
|
||||
{
|
||||
lazy_pulls.emplace_back (hash_or_account_a, retry_limit);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_add (nano::pull_info const & pull_a)
|
||||
{
|
||||
debug_assert (pull_a.account_or_head == pull_a.head);
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
lazy_add (pull_a.account_or_head, pull_a.retry_limit);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_requeue (nano::block_hash const & hash_a, nano::block_hash const & previous_a, bool confirmed_a)
|
||||
{
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
// Add only known blocks
|
||||
if (lazy_blocks_processed (hash_a))
|
||||
{
|
||||
lazy_blocks_erase (hash_a);
|
||||
lock.unlock ();
|
||||
node->bootstrap_initiator.connections->requeue_pull (nano::pull_info (hash_a, hash_a, previous_a, incremental_id, static_cast<nano::pull_info::count_t> (1), confirmed_a ? std::numeric_limits<unsigned>::max () : node->network_params.bootstrap.lazy_destinations_retry_limit));
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t nano::bootstrap_attempt_lazy::lazy_batch_size ()
|
||||
{
|
||||
auto result (node->network_params.bootstrap.lazy_max_pull_blocks);
|
||||
if (total_blocks > nano::bootstrap_limits::lazy_batch_pull_count_resize_blocks_limit && lazy_blocks_count != 0)
|
||||
{
|
||||
double lazy_blocks_ratio (total_blocks / lazy_blocks_count);
|
||||
if (lazy_blocks_ratio > nano::bootstrap_limits::lazy_batch_pull_count_resize_ratio)
|
||||
{
|
||||
// Increasing blocks ratio weight as more important (^3). Small batch count should lower blocks ratio below target
|
||||
double lazy_blocks_factor (std::pow (lazy_blocks_ratio / nano::bootstrap_limits::lazy_batch_pull_count_resize_ratio, 3.0));
|
||||
// Decreasing total block count weight as less important (sqrt)
|
||||
double total_blocks_factor (std::sqrt (total_blocks / nano::bootstrap_limits::lazy_batch_pull_count_resize_blocks_limit));
|
||||
uint32_t batch_count_min (node->network_params.bootstrap.lazy_max_pull_blocks / (lazy_blocks_factor * total_blocks_factor));
|
||||
result = std::max (node->network_params.bootstrap.lazy_min_pull_blocks, batch_count_min);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_pull_flush (nano::unique_lock<std::mutex> & lock_a)
|
||||
{
|
||||
static size_t const max_pulls (nano::bootstrap_limits::bootstrap_connection_scale_target_blocks * 3);
|
||||
if (pulling < max_pulls)
|
||||
{
|
||||
debug_assert (node->network_params.bootstrap.lazy_max_pull_blocks <= std::numeric_limits<nano::pull_info::count_t>::max ());
|
||||
nano::pull_info::count_t batch_count (lazy_batch_size ());
|
||||
uint64_t read_count (0);
|
||||
size_t count (0);
|
||||
auto transaction (node->store.tx_begin_read ());
|
||||
while (!lazy_pulls.empty () && count < max_pulls)
|
||||
{
|
||||
auto pull_start (lazy_pulls.front ());
|
||||
lazy_pulls.pop_front ();
|
||||
// Recheck if block was already processed
|
||||
if (!lazy_blocks_processed (pull_start.first) && !node->store.block_exists (transaction, pull_start.first))
|
||||
{
|
||||
lock_a.unlock ();
|
||||
node->bootstrap_initiator.connections->add_pull (nano::pull_info (pull_start.first, pull_start.first, nano::block_hash (0), incremental_id, batch_count, pull_start.second));
|
||||
++pulling;
|
||||
++count;
|
||||
lock_a.lock ();
|
||||
}
|
||||
// We don't want to open read transactions for too long
|
||||
++read_count;
|
||||
if (read_count % batch_read_size == 0)
|
||||
{
|
||||
transaction.refresh ();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_lazy::lazy_finished ()
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
if (stopped)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
bool result (true);
|
||||
uint64_t read_count (0);
|
||||
auto transaction (node->store.tx_begin_read ());
|
||||
for (auto it (lazy_keys.begin ()), end (lazy_keys.end ()); it != end && !stopped;)
|
||||
{
|
||||
if (node->store.block_exists (transaction, *it))
|
||||
{
|
||||
it = lazy_keys.erase (it);
|
||||
}
|
||||
else
|
||||
{
|
||||
result = false;
|
||||
break;
|
||||
// No need to increment `it` as we break above.
|
||||
}
|
||||
// We don't want to open read transactions for too long
|
||||
++read_count;
|
||||
if (read_count % batch_read_size == 0)
|
||||
{
|
||||
transaction.refresh ();
|
||||
}
|
||||
}
|
||||
// Finish lazy bootstrap without lazy pulls (in combination with still_pulling ())
|
||||
if (!result && lazy_pulls.empty () && lazy_state_backlog.empty ())
|
||||
{
|
||||
result = true;
|
||||
}
|
||||
// Don't close lazy bootstrap until all destinations are processed
|
||||
if (result && !lazy_destinations.empty ())
|
||||
{
|
||||
result = false;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_lazy::lazy_has_expired () const
|
||||
{
|
||||
bool result (false);
|
||||
// Max 30 minutes run with enabled legacy bootstrap
|
||||
static std::chrono::minutes const max_lazy_time (node->flags.disable_legacy_bootstrap ? 7 * 24 * 60 : 30);
|
||||
if (std::chrono::steady_clock::now () - lazy_start_time >= max_lazy_time)
|
||||
{
|
||||
result = true;
|
||||
}
|
||||
else if (!node->flags.disable_legacy_bootstrap && lazy_blocks_count > nano::bootstrap_limits::lazy_blocks_restart_limit)
|
||||
{
|
||||
result = true;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::run ()
|
||||
{
|
||||
debug_assert (started);
|
||||
debug_assert (!node->flags.disable_lazy_bootstrap);
|
||||
node->bootstrap_initiator.connections->populate_connections (false);
|
||||
lazy_start_time = std::chrono::steady_clock::now ();
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
while ((still_pulling () || !lazy_finished ()) && !lazy_has_expired ())
|
||||
{
|
||||
unsigned iterations (0);
|
||||
auto this_l (shared_from_this ());
|
||||
while (still_pulling () && !lazy_has_expired ())
|
||||
{
|
||||
condition.wait (lock, [& stopped = stopped, &pulling = pulling, &lazy_pulls = lazy_pulls, this_l] { return stopped || pulling == 0 || (pulling < nano::bootstrap_limits::bootstrap_connection_scale_target_blocks && !lazy_pulls.empty ()) || this_l->lazy_has_expired (); });
|
||||
++iterations;
|
||||
// Flushing lazy pulls
|
||||
lazy_pull_flush (lock);
|
||||
// Start backlog cleanup
|
||||
if (iterations % 100 == 0)
|
||||
{
|
||||
lazy_backlog_cleanup ();
|
||||
}
|
||||
// Destinations check
|
||||
if (pulling == 0 && lazy_destinations_flushed)
|
||||
{
|
||||
lazy_destinations_flush ();
|
||||
lazy_pull_flush (lock);
|
||||
}
|
||||
}
|
||||
// Flushing lazy pulls
|
||||
lazy_pull_flush (lock);
|
||||
// Check if some blocks required for backlog were processed. Start destinations check
|
||||
if (pulling == 0)
|
||||
{
|
||||
lazy_backlog_cleanup ();
|
||||
lazy_destinations_flush ();
|
||||
lazy_pull_flush (lock);
|
||||
}
|
||||
}
|
||||
if (!stopped)
|
||||
{
|
||||
node->logger.try_log ("Completed lazy pulls");
|
||||
}
|
||||
lock.unlock ();
|
||||
stop ();
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_lazy::process_block (std::shared_ptr<nano::block> block_a, nano::account const & known_account_a, uint64_t pull_blocks, nano::bulk_pull::count_t max_blocks, bool block_expected, unsigned retry_limit)
|
||||
{
|
||||
bool stop_pull (false);
|
||||
if (block_expected)
|
||||
{
|
||||
stop_pull = process_block_lazy (block_a, known_account_a, pull_blocks, max_blocks, retry_limit);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Drop connection with unexpected block for lazy bootstrap
|
||||
stop_pull = true;
|
||||
}
|
||||
return stop_pull;
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_lazy::process_block_lazy (std::shared_ptr<nano::block> block_a, nano::account const & known_account_a, uint64_t pull_blocks, nano::bulk_pull::count_t max_blocks, unsigned retry_limit)
|
||||
{
|
||||
bool stop_pull (false);
|
||||
auto hash (block_a->hash ());
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
// Processing new blocks
|
||||
if (!lazy_blocks_processed (hash))
|
||||
{
|
||||
// Search for new dependencies
|
||||
if (!block_a->source ().is_zero () && !node->ledger.block_exists (block_a->source ()) && block_a->source () != node->network_params.ledger.genesis_account)
|
||||
{
|
||||
lazy_add (block_a->source (), retry_limit);
|
||||
}
|
||||
else if (block_a->type () == nano::block_type::state)
|
||||
{
|
||||
lazy_block_state (block_a, retry_limit);
|
||||
}
|
||||
else if (block_a->type () == nano::block_type::send)
|
||||
{
|
||||
std::shared_ptr<nano::send_block> block_l (std::static_pointer_cast<nano::send_block> (block_a));
|
||||
if (block_l != nullptr && !block_l->hashables.destination.is_zero ())
|
||||
{
|
||||
lazy_destinations_increment (block_l->hashables.destination);
|
||||
}
|
||||
}
|
||||
lazy_blocks_insert (hash);
|
||||
// Adding lazy balances for first processed block in pull
|
||||
if (pull_blocks == 0 && (block_a->type () == nano::block_type::state || block_a->type () == nano::block_type::send))
|
||||
{
|
||||
lazy_balances.emplace (hash, block_a->balance ().number ());
|
||||
}
|
||||
// Clearing lazy balances for previous block
|
||||
if (!block_a->previous ().is_zero () && lazy_balances.find (block_a->previous ()) != lazy_balances.end ())
|
||||
{
|
||||
lazy_balances.erase (block_a->previous ());
|
||||
}
|
||||
lazy_block_state_backlog_check (block_a, hash);
|
||||
lock.unlock ();
|
||||
nano::unchecked_info info (block_a, known_account_a, 0, nano::signature_verification::unknown, retry_limit == std::numeric_limits<unsigned>::max ());
|
||||
node->block_processor.add (info);
|
||||
}
|
||||
// Force drop lazy bootstrap connection for long bulk_pull
|
||||
if (pull_blocks > max_blocks)
|
||||
{
|
||||
stop_pull = true;
|
||||
}
|
||||
return stop_pull;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_block_state (std::shared_ptr<nano::block> block_a, unsigned retry_limit)
|
||||
{
|
||||
std::shared_ptr<nano::state_block> block_l (std::static_pointer_cast<nano::state_block> (block_a));
|
||||
if (block_l != nullptr)
|
||||
{
|
||||
auto transaction (node->store.tx_begin_read ());
|
||||
nano::uint128_t balance (block_l->hashables.balance.number ());
|
||||
auto const & link (block_l->hashables.link);
|
||||
// If link is not epoch link or 0. And if block from link is unknown
|
||||
if (!link.is_zero () && !node->ledger.is_epoch_link (link) && !lazy_blocks_processed (link) && !node->store.block_exists (transaction, link))
|
||||
{
|
||||
auto const & previous (block_l->hashables.previous);
|
||||
// If state block previous is 0 then source block required
|
||||
if (previous.is_zero ())
|
||||
{
|
||||
lazy_add (link, retry_limit);
|
||||
}
|
||||
// In other cases previous block balance required to find out subtype of state block
|
||||
else if (node->store.block_exists (transaction, previous))
|
||||
{
|
||||
if (node->ledger.balance (transaction, previous) <= balance)
|
||||
{
|
||||
lazy_add (link, retry_limit);
|
||||
}
|
||||
else
|
||||
{
|
||||
lazy_destinations_increment (link);
|
||||
}
|
||||
}
|
||||
// Search balance of already processed previous blocks
|
||||
else if (lazy_blocks_processed (previous))
|
||||
{
|
||||
auto previous_balance (lazy_balances.find (previous));
|
||||
if (previous_balance != lazy_balances.end ())
|
||||
{
|
||||
if (previous_balance->second <= balance)
|
||||
{
|
||||
lazy_add (link, retry_limit);
|
||||
}
|
||||
else
|
||||
{
|
||||
lazy_destinations_increment (link);
|
||||
}
|
||||
lazy_balances.erase (previous_balance);
|
||||
}
|
||||
}
|
||||
// Insert in backlog state blocks if previous wasn't already processed
|
||||
else
|
||||
{
|
||||
lazy_state_backlog.emplace (previous, nano::lazy_state_backlog_item{ link, balance, retry_limit });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_block_state_backlog_check (std::shared_ptr<nano::block> block_a, nano::block_hash const & hash_a)
|
||||
{
|
||||
// Search unknown state blocks balances
|
||||
auto find_state (lazy_state_backlog.find (hash_a));
|
||||
if (find_state != lazy_state_backlog.end ())
|
||||
{
|
||||
auto next_block (find_state->second);
|
||||
// Retrieve balance for previous state & send blocks
|
||||
if (block_a->type () == nano::block_type::state || block_a->type () == nano::block_type::send)
|
||||
{
|
||||
if (block_a->balance ().number () <= next_block.balance) // balance
|
||||
{
|
||||
lazy_add (next_block.link, next_block.retry_limit); // link
|
||||
}
|
||||
else
|
||||
{
|
||||
lazy_destinations_increment (next_block.link);
|
||||
}
|
||||
}
|
||||
// Assumption for other legacy block types
|
||||
else if (lazy_undefined_links.find (next_block.link) == lazy_undefined_links.end ())
|
||||
{
|
||||
lazy_add (next_block.link, node->network_params.bootstrap.lazy_retry_limit); // Head is not confirmed. It can be account or hash or non-existing
|
||||
lazy_undefined_links.insert (next_block.link);
|
||||
}
|
||||
lazy_state_backlog.erase (find_state);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_backlog_cleanup ()
|
||||
{
|
||||
uint64_t read_count (0);
|
||||
auto transaction (node->store.tx_begin_read ());
|
||||
for (auto it (lazy_state_backlog.begin ()), end (lazy_state_backlog.end ()); it != end && !stopped;)
|
||||
{
|
||||
if (node->store.block_exists (transaction, it->first))
|
||||
{
|
||||
auto next_block (it->second);
|
||||
if (node->ledger.balance (transaction, it->first) <= next_block.balance) // balance
|
||||
{
|
||||
lazy_add (next_block.link, next_block.retry_limit); // link
|
||||
}
|
||||
else
|
||||
{
|
||||
lazy_destinations_increment (next_block.link);
|
||||
}
|
||||
it = lazy_state_backlog.erase (it);
|
||||
}
|
||||
else
|
||||
{
|
||||
lazy_add (it->first, it->second.retry_limit);
|
||||
++it;
|
||||
}
|
||||
// We don't want to open read transactions for too long
|
||||
++read_count;
|
||||
if (read_count % batch_read_size == 0)
|
||||
{
|
||||
transaction.refresh ();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_destinations_increment (nano::account const & destination_a)
|
||||
{
|
||||
// Enabled only if legacy bootstrap is not available. Legacy bootstrap is a more effective way to receive all existing destinations
|
||||
if (node->flags.disable_legacy_bootstrap)
|
||||
{
|
||||
// Update accounts counter for send blocks
|
||||
auto existing (lazy_destinations.get<account_tag> ().find (destination_a));
|
||||
if (existing != lazy_destinations.get<account_tag> ().end ())
|
||||
{
|
||||
lazy_destinations.get<account_tag> ().modify (existing, [](nano::lazy_destinations_item & item_a) {
|
||||
++item_a.count;
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
lazy_destinations.emplace (nano::lazy_destinations_item{ destination_a, 1 });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_destinations_flush ()
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
lazy_destinations_flushed = true;
|
||||
size_t count (0);
|
||||
for (auto it (lazy_destinations.get<count_tag> ().begin ()), end (lazy_destinations.get<count_tag> ().end ()); it != end && count < nano::bootstrap_limits::lazy_destinations_request_limit && !stopped;)
|
||||
{
|
||||
lazy_add (it->account, node->network_params.bootstrap.lazy_destinations_retry_limit);
|
||||
it = lazy_destinations.get<count_tag> ().erase (it);
|
||||
++count;
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_blocks_insert (nano::block_hash const & hash_a)
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
auto inserted (lazy_blocks.insert (std::hash<::nano::block_hash> () (hash_a)));
|
||||
if (inserted.second)
|
||||
{
|
||||
++lazy_blocks_count;
|
||||
debug_assert (lazy_blocks_count > 0);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::lazy_blocks_erase (nano::block_hash const & hash_a)
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
auto erased (lazy_blocks.erase (std::hash<::nano::block_hash> () (hash_a)));
|
||||
if (erased)
|
||||
{
|
||||
--lazy_blocks_count;
|
||||
debug_assert (lazy_blocks_count != std::numeric_limits<size_t>::max ());
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_lazy::lazy_blocks_processed (nano::block_hash const & hash_a)
|
||||
{
|
||||
return lazy_blocks.find (std::hash<::nano::block_hash> () (hash_a)) != lazy_blocks.end ();
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_lazy::lazy_processed_or_exists (nano::block_hash const & hash_a)
|
||||
{
|
||||
bool result (false);
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
if (lazy_blocks_processed (hash_a))
|
||||
{
|
||||
result = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
lock.unlock ();
|
||||
if (node->ledger.block_exists (hash_a))
|
||||
{
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_lazy::get_information (boost::property_tree::ptree & tree_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
tree_a.put ("lazy_blocks", std::to_string (lazy_blocks.size ()));
|
||||
tree_a.put ("lazy_state_backlog", std::to_string (lazy_state_backlog.size ()));
|
||||
tree_a.put ("lazy_balances", std::to_string (lazy_balances.size ()));
|
||||
tree_a.put ("lazy_destinations", std::to_string (lazy_destinations.size ()));
|
||||
tree_a.put ("lazy_undefined_links", std::to_string (lazy_undefined_links.size ()));
|
||||
tree_a.put ("lazy_pulls", std::to_string (lazy_pulls.size ()));
|
||||
tree_a.put ("lazy_keys", std::to_string (lazy_keys.size ()));
|
||||
if (!lazy_keys.empty ())
|
||||
{
|
||||
tree_a.put ("lazy_key_1", (*(lazy_keys.begin ())).to_string ());
|
||||
}
|
||||
}
|
||||
|
||||
nano::bootstrap_attempt_wallet::bootstrap_attempt_wallet (std::shared_ptr<nano::node> node_a, uint64_t incremental_id_a, std::string id_a) :
|
||||
nano::bootstrap_attempt (node_a, nano::bootstrap_mode::wallet_lazy, incremental_id_a, id_a)
|
||||
{
|
||||
node->bootstrap_initiator.notify_listeners (true);
|
||||
}
|
||||
|
||||
nano::bootstrap_attempt_wallet::~bootstrap_attempt_wallet ()
|
||||
{
|
||||
node->bootstrap_initiator.notify_listeners (false);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_wallet::request_pending (nano::unique_lock<std::mutex> & lock_a)
|
||||
{
|
||||
lock_a.unlock ();
|
||||
auto connection_l (node->bootstrap_initiator.connections->connection (shared_from_this ()));
|
||||
lock_a.lock ();
|
||||
if (connection_l)
|
||||
{
|
||||
auto account (wallet_accounts.front ());
|
||||
wallet_accounts.pop_front ();
|
||||
++pulling;
|
||||
auto this_l (shared_from_this ());
|
||||
// The bulk_pull_account_client destructor attempt to requeue_pull which can cause a deadlock if this is the last reference
|
||||
// Dispatch request in an external thread in case it needs to be destroyed
|
||||
node->background ([connection_l, this_l, account]() {
|
||||
auto client (std::make_shared<nano::bulk_pull_account_client> (connection_l, this_l, account));
|
||||
client->request ();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_wallet::requeue_pending (nano::account const & account_a)
|
||||
{
|
||||
auto account (account_a);
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
wallet_accounts.push_front (account);
|
||||
}
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_wallet::wallet_start (std::deque<nano::account> & accounts_a)
|
||||
{
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
wallet_accounts.swap (accounts_a);
|
||||
}
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
bool nano::bootstrap_attempt_wallet::wallet_finished ()
|
||||
{
|
||||
debug_assert (!mutex.try_lock ());
|
||||
auto running (!stopped);
|
||||
auto more_accounts (!wallet_accounts.empty ());
|
||||
auto still_pulling (pulling > 0);
|
||||
return running && (more_accounts || still_pulling);
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_wallet::run ()
|
||||
{
|
||||
debug_assert (started);
|
||||
debug_assert (!node->flags.disable_wallet_bootstrap);
|
||||
node->bootstrap_initiator.connections->populate_connections (false);
|
||||
auto start_time (std::chrono::steady_clock::now ());
|
||||
auto max_time (std::chrono::minutes (10));
|
||||
nano::unique_lock<std::mutex> lock (mutex);
|
||||
while (wallet_finished () && std::chrono::steady_clock::now () - start_time < max_time)
|
||||
{
|
||||
if (!wallet_accounts.empty ())
|
||||
{
|
||||
request_pending (lock);
|
||||
}
|
||||
else
|
||||
{
|
||||
condition.wait_for (lock, std::chrono::seconds (1));
|
||||
}
|
||||
}
|
||||
if (!stopped)
|
||||
{
|
||||
node->logger.try_log ("Completed wallet lazy pulls");
|
||||
}
|
||||
lock.unlock ();
|
||||
stop ();
|
||||
condition.notify_all ();
|
||||
}
|
||||
|
||||
size_t nano::bootstrap_attempt_wallet::wallet_size ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
return wallet_accounts.size ();
|
||||
}
|
||||
|
||||
void nano::bootstrap_attempt_wallet::get_information (boost::property_tree::ptree & tree_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
tree_a.put ("wallet_accounts", std::to_string (wallet_accounts.size ()));
|
||||
}
|
101
nano/node/bootstrap/bootstrap_lazy.hpp
Normal file
101
nano/node/bootstrap/bootstrap_lazy.hpp
Normal file
|
@ -0,0 +1,101 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_bulk_pull.hpp>
|
||||
|
||||
#include <boost/multi_index/hashed_index.hpp>
|
||||
#include <boost/multi_index/member.hpp>
|
||||
#include <boost/multi_index/ordered_index.hpp>
|
||||
#include <boost/multi_index_container.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <queue>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace mi = boost::multi_index;
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class node;
|
||||
class lazy_state_backlog_item final
|
||||
{
|
||||
public:
|
||||
nano::link link{ 0 };
|
||||
nano::uint128_t balance{ 0 };
|
||||
unsigned retry_limit{ 0 };
|
||||
};
|
||||
class lazy_destinations_item final
|
||||
{
|
||||
public:
|
||||
nano::account account{ 0 };
|
||||
uint64_t count{ 0 };
|
||||
};
|
||||
class bootstrap_attempt_lazy final : public bootstrap_attempt
|
||||
{
|
||||
public:
|
||||
explicit bootstrap_attempt_lazy (std::shared_ptr<nano::node> node_a, uint64_t incremental_id_a, std::string id_a = "");
|
||||
~bootstrap_attempt_lazy ();
|
||||
bool process_block (std::shared_ptr<nano::block>, nano::account const &, uint64_t, nano::bulk_pull::count_t, bool, unsigned) override;
|
||||
void run () override;
|
||||
void lazy_start (nano::hash_or_account const &, bool confirmed = true) override;
|
||||
void lazy_add (nano::hash_or_account const &, unsigned = std::numeric_limits<unsigned>::max ());
|
||||
void lazy_add (nano::pull_info const &) override;
|
||||
void lazy_requeue (nano::block_hash const &, nano::block_hash const &, bool) override;
|
||||
bool lazy_finished ();
|
||||
bool lazy_has_expired () const override;
|
||||
uint32_t lazy_batch_size () override;
|
||||
void lazy_pull_flush (nano::unique_lock<std::mutex> & lock_a);
|
||||
bool process_block_lazy (std::shared_ptr<nano::block>, nano::account const &, uint64_t, nano::bulk_pull::count_t, unsigned);
|
||||
void lazy_block_state (std::shared_ptr<nano::block>, unsigned);
|
||||
void lazy_block_state_backlog_check (std::shared_ptr<nano::block>, nano::block_hash const &);
|
||||
void lazy_backlog_cleanup ();
|
||||
void lazy_destinations_increment (nano::account const &);
|
||||
void lazy_destinations_flush ();
|
||||
void lazy_blocks_insert (nano::block_hash const &);
|
||||
void lazy_blocks_erase (nano::block_hash const &);
|
||||
bool lazy_blocks_processed (nano::block_hash const &);
|
||||
bool lazy_processed_or_exists (nano::block_hash const &) override;
|
||||
void get_information (boost::property_tree::ptree &) override;
|
||||
std::unordered_set<size_t> lazy_blocks;
|
||||
std::unordered_map<nano::block_hash, nano::lazy_state_backlog_item> lazy_state_backlog;
|
||||
std::unordered_set<nano::block_hash> lazy_undefined_links;
|
||||
std::unordered_map<nano::block_hash, nano::uint128_t> lazy_balances;
|
||||
std::unordered_set<nano::block_hash> lazy_keys;
|
||||
std::deque<std::pair<nano::hash_or_account, unsigned>> lazy_pulls;
|
||||
std::chrono::steady_clock::time_point lazy_start_time;
|
||||
class account_tag
|
||||
{
|
||||
};
|
||||
class count_tag
|
||||
{
|
||||
};
|
||||
// clang-format off
|
||||
boost::multi_index_container<lazy_destinations_item,
|
||||
mi::indexed_by<
|
||||
mi::ordered_non_unique<mi::tag<count_tag>,
|
||||
mi::member<lazy_destinations_item, uint64_t, &lazy_destinations_item::count>,
|
||||
std::greater<uint64_t>>,
|
||||
mi::hashed_unique<mi::tag<account_tag>,
|
||||
mi::member<lazy_destinations_item, nano::account, &lazy_destinations_item::account>>>>
|
||||
lazy_destinations;
|
||||
// clang-format on
|
||||
std::atomic<size_t> lazy_blocks_count{ 0 };
|
||||
std::atomic<bool> lazy_destinations_flushed{ false };
|
||||
/** The maximum number of records to be read in while iterating over long lazy containers */
|
||||
static uint64_t constexpr batch_read_size = 256;
|
||||
};
|
||||
class bootstrap_attempt_wallet final : public bootstrap_attempt
|
||||
{
|
||||
public:
|
||||
explicit bootstrap_attempt_wallet (std::shared_ptr<nano::node> node_a, uint64_t incremental_id_a, std::string id_a = "");
|
||||
~bootstrap_attempt_wallet ();
|
||||
void request_pending (nano::unique_lock<std::mutex> &);
|
||||
void requeue_pending (nano::account const &) override;
|
||||
void run () override;
|
||||
void wallet_start (std::deque<nano::account> &) override;
|
||||
bool wallet_finished ();
|
||||
size_t wallet_size () override;
|
||||
void get_information (boost::property_tree::ptree &) override;
|
||||
std::deque<nano::account> wallet_accounts;
|
||||
};
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
#include <nano/lib/config.hpp>
|
||||
#include <nano/lib/json_error_response.hpp>
|
||||
#include <nano/lib/timer.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/election.hpp>
|
||||
#include <nano/node/json_handler.hpp>
|
||||
|
@ -1679,41 +1680,39 @@ void nano::json_handler::bootstrap_lazy ()
|
|||
*/
|
||||
void nano::json_handler::bootstrap_status ()
|
||||
{
|
||||
auto attempt (node.bootstrap_initiator.current_attempt ());
|
||||
if (attempt != nullptr)
|
||||
auto attempts_count (node.bootstrap_initiator.attempts.size ());
|
||||
response_l.put ("bootstrap_threads", std::to_string (node.config.bootstrap_initiator_threads));
|
||||
response_l.put ("running_attempts_count", std::to_string (attempts_count));
|
||||
response_l.put ("total_attempts_count", std::to_string (node.bootstrap_initiator.attempts.incremental));
|
||||
boost::property_tree::ptree connections;
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (attempt->mutex);
|
||||
nano::lock_guard<std::mutex> lazy_lock (attempt->lazy_mutex);
|
||||
response_l.put ("id", attempt->id);
|
||||
response_l.put ("clients", std::to_string (attempt->clients.size ()));
|
||||
response_l.put ("pulls", std::to_string (attempt->pulls.size ()));
|
||||
response_l.put ("pulling", std::to_string (attempt->pulling));
|
||||
response_l.put ("connections", std::to_string (attempt->connections));
|
||||
response_l.put ("idle", std::to_string (attempt->idle.size ()));
|
||||
response_l.put ("target_connections", std::to_string (attempt->target_connections (attempt->pulls.size ())));
|
||||
response_l.put ("total_blocks", std::to_string (attempt->total_blocks));
|
||||
response_l.put ("runs_count", std::to_string (attempt->runs_count));
|
||||
response_l.put ("requeued_pulls", std::to_string (attempt->requeued_pulls));
|
||||
response_l.put ("frontiers_received", static_cast<bool> (attempt->frontiers_received));
|
||||
response_l.put ("frontiers_confirmed", static_cast<bool> (attempt->frontiers_confirmed));
|
||||
response_l.put ("mode", attempt->mode_text ());
|
||||
response_l.put ("lazy_blocks", std::to_string (attempt->lazy_blocks.size ()));
|
||||
response_l.put ("lazy_state_backlog", std::to_string (attempt->lazy_state_backlog.size ()));
|
||||
response_l.put ("lazy_balances", std::to_string (attempt->lazy_balances.size ()));
|
||||
response_l.put ("lazy_destinations", std::to_string (attempt->lazy_destinations.size ()));
|
||||
response_l.put ("lazy_undefined_links", std::to_string (attempt->lazy_undefined_links.size ()));
|
||||
response_l.put ("lazy_pulls", std::to_string (attempt->lazy_pulls.size ()));
|
||||
response_l.put ("lazy_keys", std::to_string (attempt->lazy_keys.size ()));
|
||||
if (!attempt->lazy_keys.empty ())
|
||||
nano::lock_guard<std::mutex> connections_lock (node.bootstrap_initiator.connections->mutex);
|
||||
connections.put ("clients", std::to_string (node.bootstrap_initiator.connections->clients.size ()));
|
||||
connections.put ("connections", std::to_string (node.bootstrap_initiator.connections->connections_count));
|
||||
connections.put ("idle", std::to_string (node.bootstrap_initiator.connections->idle.size ()));
|
||||
connections.put ("target_connections", std::to_string (node.bootstrap_initiator.connections->target_connections (node.bootstrap_initiator.connections->pulls.size (), attempts_count)));
|
||||
connections.put ("pulls", std::to_string (node.bootstrap_initiator.connections->pulls.size ()));
|
||||
}
|
||||
response_l.add_child ("connections", connections);
|
||||
boost::property_tree::ptree attempts;
|
||||
{
|
||||
nano::lock_guard<std::mutex> attempts_lock (node.bootstrap_initiator.attempts.bootstrap_attempts_mutex);
|
||||
for (auto i : node.bootstrap_initiator.attempts.attempts)
|
||||
{
|
||||
response_l.put ("lazy_key_1", (*(attempt->lazy_keys.begin ())).to_string ());
|
||||
boost::property_tree::ptree entry;
|
||||
auto & attempt (i.second);
|
||||
entry.put ("id", attempt->id);
|
||||
entry.put ("mode", attempt->mode_text ());
|
||||
entry.put ("started", static_cast<bool> (attempt->started));
|
||||
entry.put ("pulling", std::to_string (attempt->pulling));
|
||||
entry.put ("total_blocks", std::to_string (attempt->total_blocks));
|
||||
entry.put ("requeued_pulls", std::to_string (attempt->requeued_pulls));
|
||||
attempt->get_information (entry);
|
||||
entry.put ("duration", std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - attempt->attempt_start).count ());
|
||||
attempts.push_back (std::make_pair ("", entry));
|
||||
}
|
||||
response_l.put ("duration", std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - attempt->attempt_start).count ());
|
||||
}
|
||||
else
|
||||
{
|
||||
response_l.put ("active", "0");
|
||||
}
|
||||
response_l.add_child ("attempts", attempts);
|
||||
response_errors ();
|
||||
}
|
||||
|
||||
|
|
|
@ -557,11 +557,11 @@ void nano::node::process_fork (nano::transaction const & transaction_a, std::sha
|
|||
auto account (this_l->ledger.store.frontier_get (transaction, root));
|
||||
if (!account.is_zero ())
|
||||
{
|
||||
attempt->requeue_pull (nano::pull_info (account, root, root));
|
||||
this_l->bootstrap_initiator.connections->requeue_pull (nano::pull_info (account, root, root, attempt->incremental_id));
|
||||
}
|
||||
else if (this_l->ledger.store.account_exists (transaction, root))
|
||||
{
|
||||
attempt->requeue_pull (nano::pull_info (root, nano::block_hash (0), nano::block_hash (0)));
|
||||
this_l->bootstrap_initiator.connections->requeue_pull (nano::pull_info (root, nano::block_hash (0), nano::block_hash (0), attempt->incremental_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -918,7 +918,10 @@ void nano::node::bootstrap_wallet ()
|
|||
}
|
||||
}
|
||||
}
|
||||
bootstrap_initiator.bootstrap_wallet (accounts);
|
||||
if (!accounts.empty ())
|
||||
{
|
||||
bootstrap_initiator.bootstrap_wallet (accounts);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::node::unchecked_cleanup ()
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#include <nano/node/active_transactions.hpp>
|
||||
#include <nano/node/blockprocessor.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
|
||||
#include <nano/node/bootstrap/bootstrap_server.hpp>
|
||||
#include <nano/node/confirmation_height_processor.hpp>
|
||||
#include <nano/node/distributed_work_factory.hpp>
|
||||
|
|
|
@ -80,6 +80,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
|
|||
toml.put ("enable_voting", enable_voting, "Enable or disable voting. Enabling this option requires additional system resources, namely increased CPU, bandwidth and disk usage.\ntype:bool");
|
||||
toml.put ("bootstrap_connections", bootstrap_connections, "Number of outbound bootstrap connections. Must be a power of 2. Defaults to 4.\nWarning: a larger amount of connections may use substantially more system memory.\ntype:uint64");
|
||||
toml.put ("bootstrap_connections_max", bootstrap_connections_max, "Maximum number of inbound bootstrap connections. Defaults to 64.\nWarning: a larger amount of connections may use additional system memory.\ntype:uint64");
|
||||
toml.put ("bootstrap_initiator_threads", bootstrap_initiator_threads, "Number of threads dedicated to concurrent bootstrap attempts. Defaults to 2 (if the number of CPU threads is more than 1), otherwise 1.\nWarning: a larger amount of attempts may use additional system memory and disk IO.\ntype:uint64");
|
||||
toml.put ("lmdb_max_dbs", lmdb_max_dbs, "Maximum open lmdb databases. Increase default if more than 100 wallets is required.\nNote: external management is recommended when a large amounts of wallets are required (see https://docs.nano.org/integration-guides/key-management/).\ntype:uint64");
|
||||
toml.put ("block_processor_batch_max_time", block_processor_batch_max_time.count (), "The maximum time the block processor can continously process blocks for.\ntype:milliseconds");
|
||||
toml.put ("allow_local_peers", allow_local_peers, "Enable or disable local host peering.\ntype:bool");
|
||||
|
@ -302,6 +303,7 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
|
|||
toml.get<unsigned> ("network_threads", network_threads);
|
||||
toml.get<unsigned> ("bootstrap_connections", bootstrap_connections);
|
||||
toml.get<unsigned> ("bootstrap_connections_max", bootstrap_connections_max);
|
||||
toml.get<unsigned> ("bootstrap_initiator_threads", bootstrap_initiator_threads);
|
||||
toml.get<int> ("lmdb_max_dbs", lmdb_max_dbs);
|
||||
toml.get<bool> ("enable_voting", enable_voting);
|
||||
toml.get<bool> ("allow_local_peers", allow_local_peers);
|
||||
|
|
|
@ -63,6 +63,7 @@ public:
|
|||
bool enable_voting{ false };
|
||||
unsigned bootstrap_connections{ 4 };
|
||||
unsigned bootstrap_connections_max{ 64 };
|
||||
unsigned bootstrap_initiator_threads{ network_params.network.is_test_network () ? 1u : std::min<unsigned> (2, std::max<unsigned> (1, std::thread::hardware_concurrency ())) };
|
||||
nano::websocket::config websocket_config;
|
||||
nano::diagnostics_config diagnostics_config;
|
||||
size_t confirmation_history_size{ 2048 };
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue