Limit frontier request block count by default (#3147)

Divide frontier requests into smaller chunks which reduces both memory usage and bandwidth consumption.
This commit is contained in:
Sergey Kroshnin 2021-04-16 14:17:21 +03:00 committed by GitHub
commit bb936fb54a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 105 additions and 38 deletions

View file

@ -155,6 +155,7 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.bootstrap_connections, defaults.node.bootstrap_connections);
ASSERT_EQ (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max);
ASSERT_EQ (conf.node.bootstrap_initiator_threads, defaults.node.bootstrap_initiator_threads);
ASSERT_EQ (conf.node.bootstrap_frontier_request_count, defaults.node.bootstrap_frontier_request_count);
ASSERT_EQ (conf.node.bootstrap_fraction_numerator, defaults.node.bootstrap_fraction_numerator);
ASSERT_EQ (conf.node.conf_height_processor_batch_min_time, defaults.node.conf_height_processor_batch_min_time);
ASSERT_EQ (conf.node.confirmation_history_size, defaults.node.confirmation_history_size);
@ -394,6 +395,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
bootstrap_connections = 999
bootstrap_connections_max = 999
bootstrap_initiator_threads = 999
bootstrap_frontier_request_count = 9999
bootstrap_fraction_numerator = 999
conf_height_processor_batch_min_time = 999
confirmation_history_size = 999
@ -556,6 +558,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.bootstrap_connections, defaults.node.bootstrap_connections);
ASSERT_NE (conf.node.bootstrap_connections_max, defaults.node.bootstrap_connections_max);
ASSERT_NE (conf.node.bootstrap_initiator_threads, defaults.node.bootstrap_initiator_threads);
ASSERT_NE (conf.node.bootstrap_frontier_request_count, defaults.node.bootstrap_frontier_request_count);
ASSERT_NE (conf.node.bootstrap_fraction_numerator, defaults.node.bootstrap_fraction_numerator);
ASSERT_NE (conf.node.conf_height_processor_batch_min_time, defaults.node.conf_height_processor_batch_min_time);
ASSERT_NE (conf.node.confirmation_history_size, defaults.node.confirmation_history_size);
@ -838,6 +841,21 @@ TEST (toml, daemon_config_deserialize_errors)
ASSERT_EQ (toml.get_error ().get_message (), "confirm_req_batches_max must be between 1 and 100");
}
{
std::stringstream ss;
ss << R"toml(
[node]
bootstrap_frontier_request_count = 1000
)toml";
nano::tomlconfig toml;
toml.read (ss);
nano::daemon_config conf;
conf.deserialize_toml (toml);
ASSERT_EQ (toml.get_error ().get_message (), "bootstrap_frontier_request_count must be greater than or equal to 1024");
}
}
TEST (toml, daemon_read_config)

View file

@ -31,7 +31,7 @@ nano::bootstrap_initiator::~bootstrap_initiator ()
stop ();
}
void nano::bootstrap_initiator::bootstrap (bool force, std::string id_a, uint32_t const frontiers_age_a)
void nano::bootstrap_initiator::bootstrap (bool force, std::string id_a, uint32_t const frontiers_age_a, nano::account const & start_account_a)
{
if (force)
{
@ -41,7 +41,7 @@ void nano::bootstrap_initiator::bootstrap (bool force, std::string id_a, uint32_
if (!stopped && find_attempt (nano::bootstrap_mode::legacy) == nullptr)
{
node.stats.inc (nano::stat::type::bootstrap, frontiers_age_a == std::numeric_limits<uint32_t>::max () ? nano::stat::detail::initiate : nano::stat::detail::initiate_legacy_age, nano::stat::dir::out);
auto legacy_attempt (std::make_shared<nano::bootstrap_attempt_legacy> (node.shared (), attempts.incremental++, id_a, frontiers_age_a));
auto legacy_attempt (std::make_shared<nano::bootstrap_attempt_legacy> (node.shared (), attempts.incremental++, id_a, frontiers_age_a, start_account_a));
attempts_list.push_back (legacy_attempt);
attempts.add (legacy_attempt);
lock.unlock ();
@ -67,7 +67,7 @@ void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bo
stop_attempts ();
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate, nano::stat::dir::out);
nano::lock_guard<nano::mutex> lock (mutex);
auto legacy_attempt (std::make_shared<nano::bootstrap_attempt_legacy> (node.shared (), attempts.incremental++, id_a, std::numeric_limits<uint32_t>::max ()));
auto legacy_attempt (std::make_shared<nano::bootstrap_attempt_legacy> (node.shared (), attempts.incremental++, id_a, std::numeric_limits<uint32_t>::max (), 0));
attempts_list.push_back (legacy_attempt);
attempts.add (legacy_attempt);
if (!node.network.excluded_peers.check (nano::transport::map_endpoint_to_tcp (endpoint_a)))

View file

@ -82,7 +82,7 @@ public:
explicit bootstrap_initiator (nano::node &);
~bootstrap_initiator ();
void bootstrap (nano::endpoint const &, bool add_to_peers = true, std::string id_a = "");
void bootstrap (bool force = false, std::string id_a = "", uint32_t const frontiers_age_a = std::numeric_limits<uint32_t>::max ());
void bootstrap (bool force = false, std::string id_a = "", uint32_t const frontiers_age_a = std::numeric_limits<uint32_t>::max (), nano::account const & start_account_a = nano::account (0));
void bootstrap_lazy (nano::hash_or_account const &, bool force = false, bool confirmed = true, std::string id_a = "");
void bootstrap_wallet (std::deque<nano::account> &);
void run_bootstrap ();

View file

@ -129,6 +129,11 @@ bool nano::bootstrap_attempt::request_bulk_push_target (std::pair<nano::block_ha
return true;
}
void nano::bootstrap_attempt::set_start_account (nano::account const &)
{
debug_assert (mode == nano::bootstrap_mode::legacy);
}
bool nano::bootstrap_attempt::process_block (std::shared_ptr<nano::block> const & block_a, nano::account const & known_account_a, uint64_t pull_blocks_processed, nano::bulk_pull::count_t max_blocks, bool block_expected, unsigned retry_limit)
{
bool stop_pull (false);

View file

@ -26,6 +26,7 @@ public:
virtual void add_frontier (nano::pull_info const &);
virtual void add_bulk_push_target (nano::block_hash const &, nano::block_hash const &);
virtual bool request_bulk_push_target (std::pair<nano::block_hash, nano::block_hash> &);
virtual void set_start_account (nano::account const &);
virtual void lazy_start (nano::hash_or_account const &, bool confirmed = true);
virtual void lazy_add (nano::pull_info const &);
virtual void lazy_requeue (nano::block_hash const &, nano::block_hash const &, bool);

View file

@ -12,13 +12,16 @@ constexpr unsigned nano::bootstrap_limits::bulk_push_cost_limit;
constexpr size_t nano::frontier_req_client::size_frontier;
void nano::frontier_req_client::run (uint32_t const frontiers_age_a)
void nano::frontier_req_client::run (nano::account const & start_account_a, uint32_t const frontiers_age_a, uint32_t const count_a)
{
nano::frontier_req request;
request.start.clear ();
request.start = (start_account_a.is_zero () || start_account_a.number () == std::numeric_limits<nano::uint256_t>::max ()) ? start_account_a : start_account_a.number () + 1;
request.age = frontiers_age_a;
request.count = std::numeric_limits<decltype (request.count)>::max ();
request.count = count_a;
current = start_account_a;
frontiers_age = frontiers_age_a;
count_limit = count_a;
next (); // Load accounts from disk
auto this_l (shared_from_this ());
connection->channel->send (
request, [this_l](boost::system::error_code const & ec, size_t size_a) {
@ -40,11 +43,9 @@ void nano::frontier_req_client::run (uint32_t const frontiers_age_a)
nano::frontier_req_client::frontier_req_client (std::shared_ptr<nano::bootstrap_client> const & connection_a, std::shared_ptr<nano::bootstrap_attempt> const & attempt_a) :
connection (connection_a),
attempt (attempt_a),
current (0),
count (0),
bulk_push_cost (0)
{
next ();
}
void nano::frontier_req_client::receive_frontier ()
@ -107,9 +108,10 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
double elapsed_sec = std::max (time_span.count (), nano::bootstrap_limits::bootstrap_minimum_elapsed_seconds_blockrate);
double blocks_per_sec = static_cast<double> (count) / elapsed_sec;
if (elapsed_sec > nano::bootstrap_limits::bootstrap_connection_warmup_time_sec && blocks_per_sec < nano::bootstrap_limits::bootstrap_minimum_frontier_blocks_per_sec)
double age_factor = (frontiers_age == std::numeric_limits<decltype (frontiers_age)>::max ()) ? 1.0 : 1.5; // Allow slower frontiers receive for requests with age
if (elapsed_sec > nano::bootstrap_limits::bootstrap_connection_warmup_time_sec && blocks_per_sec * age_factor < nano::bootstrap_limits::bootstrap_minimum_frontier_blocks_per_sec)
{
connection->node->logger.try_log (boost::str (boost::format ("Aborting frontier req because it was too slow")));
connection->node->logger.try_log (boost::str (boost::format ("Aborting frontier req because it was too slow: %1% frontiers per second, last %2%") % blocks_per_sec % account.to_account ()));
promise.set_value (true);
return;
}
@ -117,8 +119,9 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
{
connection->node->logger.always_log (boost::str (boost::format ("Received %1% frontiers from %2%") % std::to_string (count) % connection->channel->to_string ()));
}
if (!account.is_zero ())
if (!account.is_zero () && count <= count_limit)
{
last_account = account;
while (!current.is_zero () && current < account)
{
// We know about an account they don't.
@ -164,26 +167,34 @@ void nano::frontier_req_client::received_frontier (boost::system::error_code con
}
else
{
while (!current.is_zero ())
if (count <= count_limit)
{
// We know about an account they don't.
unsynced (frontier, 0);
next ();
}
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Bulk push cost: ", bulk_push_cost);
}
{
try
while (!current.is_zero ())
{
promise.set_value (false);
// We know about an account they don't.
unsynced (frontier, 0);
next ();
}
catch (std::future_error &)
// Prevent new frontier_req requests
attempt->set_start_account (std::numeric_limits<nano::uint256_t>::max ());
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Bulk push cost: ", bulk_push_cost);
}
connection->connections->pool_connection (connection);
}
else
{
// Set last processed account as new start target
attempt->set_start_account (last_account);
}
try
{
promise.set_value (false);
}
catch (std::future_error &)
{
}
connection->connections->pool_connection (connection);
}
}
else

View file

@ -13,7 +13,7 @@ class frontier_req_client final : public std::enable_shared_from_this<nano::fron
{
public:
explicit frontier_req_client (std::shared_ptr<nano::bootstrap_client> const &, std::shared_ptr<nano::bootstrap_attempt> const &);
void run (uint32_t const frontiers_age_a);
void run (nano::account const & start_account_a, uint32_t const frontiers_age_a, uint32_t const count_a);
void receive_frontier ();
void received_frontier (boost::system::error_code const &, size_t);
void unsynced (nano::block_hash const &, nano::block_hash const &);
@ -23,14 +23,14 @@ public:
nano::account current;
nano::block_hash frontier;
unsigned count;
nano::account landing;
nano::account faucet;
nano::account last_account{ std::numeric_limits<nano::uint256_t>::max () }; // Using last possible account stop further frontier requests
std::chrono::steady_clock::time_point start_time;
std::promise<bool> promise;
/** A very rough estimate of the cost of `bulk_push`ing missing blocks */
uint64_t bulk_push_cost;
std::deque<std::pair<nano::account, nano::block_hash>> accounts;
uint32_t frontiers_age{ std::numeric_limits<uint32_t>::max () };
uint32_t count_limit{ std::numeric_limits<uint32_t>::max () };
static size_t constexpr size_frontier = sizeof (nano::account) + sizeof (nano::block_hash);
};
class bootstrap_server;

View file

@ -5,9 +5,10 @@
#include <boost/format.hpp>
nano::bootstrap_attempt_legacy::bootstrap_attempt_legacy (std::shared_ptr<nano::node> const & node_a, uint64_t const incremental_id_a, std::string const & id_a, uint32_t const frontiers_age_a) :
nano::bootstrap_attempt_legacy::bootstrap_attempt_legacy (std::shared_ptr<nano::node> const & node_a, uint64_t const incremental_id_a, std::string const & id_a, uint32_t const frontiers_age_a, nano::account const & start_account_a) :
nano::bootstrap_attempt (node_a, nano::bootstrap_mode::legacy, incremental_id_a, id_a),
frontiers_age (frontiers_age_a)
frontiers_age (frontiers_age_a),
start_account (start_account_a)
{
node->bootstrap_initiator.notify_listeners (true);
}
@ -115,6 +116,13 @@ bool nano::bootstrap_attempt_legacy::request_bulk_push_target (std::pair<nano::b
return empty;
}
void nano::bootstrap_attempt_legacy::set_start_account (nano::account const & start_account_a)
{
// Add last account fron frontier request
nano::lock_guard<nano::mutex> lock (mutex);
start_account = start_account_a;
}
bool nano::bootstrap_attempt_legacy::request_frontier (nano::unique_lock<nano::mutex> & lock_a, bool first_attempt)
{
auto result (true);
@ -128,7 +136,7 @@ bool nano::bootstrap_attempt_legacy::request_frontier (nano::unique_lock<nano::m
{
auto this_l (shared_from_this ());
auto client (std::make_shared<nano::frontier_req_client> (connection_l, this_l));
client->run (frontiers_age);
client->run (start_account, frontiers_age, node->config.bootstrap_frontier_request_count);
frontiers = client;
future = client->promise.get_future ();
}
@ -181,8 +189,6 @@ bool nano::bootstrap_attempt_legacy::request_frontier (nano::unique_lock<nano::m
void nano::bootstrap_attempt_legacy::run_start (nano::unique_lock<nano::mutex> & lock_a)
{
frontiers_received = false;
total_blocks = 0;
requeued_pulls = 0;
auto frontier_failure (true);
uint64_t frontier_attempts (0);
while (!stopped && frontier_failure)
@ -212,7 +218,16 @@ void nano::bootstrap_attempt_legacy::run ()
lock.unlock ();
node->block_processor.flush ();
lock.lock ();
node->logger.try_log ("Finished flushing unchecked blocks");
if (start_account.number () != std::numeric_limits<nano::uint256_t>::max ())
{
node->logger.try_log (boost::str (boost::format ("Finished flushing unchecked blocks, requesting new frontiers after %1%") % start_account.to_account ()));
// Requesting new frontiers
run_start (lock);
}
else
{
node->logger.try_log ("Finished flushing unchecked blocks");
}
}
if (!stopped)
{
@ -236,4 +251,6 @@ void nano::bootstrap_attempt_legacy::get_information (boost::property_tree::ptre
nano::lock_guard<nano::mutex> lock (mutex);
tree_a.put ("frontier_pulls", std::to_string (frontier_pulls.size ()));
tree_a.put ("frontiers_received", static_cast<bool> (frontiers_received));
tree_a.put ("frontiers_age", std::to_string (frontiers_age));
tree_a.put ("last_account", start_account.to_account ());
}

View file

@ -16,7 +16,7 @@ class node;
class bootstrap_attempt_legacy : public bootstrap_attempt
{
public:
explicit bootstrap_attempt_legacy (std::shared_ptr<nano::node> const & node_a, uint64_t const incremental_id_a, std::string const & id_a = "", uint32_t const frontiers_age_a = std::numeric_limits<uint32_t>::max ());
explicit bootstrap_attempt_legacy (std::shared_ptr<nano::node> const & node_a, uint64_t const incremental_id_a, std::string const & id_a, uint32_t const frontiers_age_a, nano::account const & start_account_a);
void run () override;
bool consume_future (std::future<bool> &);
void stop () override;
@ -25,6 +25,7 @@ public:
void add_frontier (nano::pull_info const &) override;
void add_bulk_push_target (nano::block_hash const &, nano::block_hash const &) override;
bool request_bulk_push_target (std::pair<nano::block_hash, nano::block_hash> &) override;
void set_start_account (nano::account const &) override;
void run_start (nano::unique_lock<nano::mutex> &);
void get_information (boost::property_tree::ptree &) override;
nano::tcp_endpoint endpoint_frontier_request;
@ -32,6 +33,7 @@ public:
std::weak_ptr<nano::bulk_push_client> push;
std::deque<nano::pull_info> frontier_pulls;
std::vector<std::pair<nano::block_hash, nano::block_hash>> bulk_push_targets;
nano::account start_account{ 0 };
std::atomic<unsigned> account_count{ 0 };
uint32_t frontiers_age;
};

View file

@ -1723,8 +1723,14 @@ void nano::json_handler::bootstrap_any ()
const bool force = request.get<bool> ("force", false);
if (!node.flags.disable_legacy_bootstrap)
{
nano::account start_account (0);
boost::optional<std::string> account_text (request.get_optional<std::string> ("account"));
if (account_text.is_initialized ())
{
start_account = account_impl (account_text.get ());
}
std::string bootstrap_id (request.get<std::string> ("id", ""));
node.bootstrap_initiator.bootstrap (force, bootstrap_id);
node.bootstrap_initiator.bootstrap (force, bootstrap_id, std::numeric_limits<uint32_t>::max (), start_account);
response_l.put ("success", "");
}
else

View file

@ -87,6 +87,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
toml.put ("bootstrap_connections", bootstrap_connections, "Number of outbound bootstrap connections. Must be a power of 2. Defaults to 4.\nWarning: a larger amount of connections may use substantially more system memory.\ntype:uint64");
toml.put ("bootstrap_connections_max", bootstrap_connections_max, "Maximum number of inbound bootstrap connections. Defaults to 64.\nWarning: a larger amount of connections may use additional system memory.\ntype:uint64");
toml.put ("bootstrap_initiator_threads", bootstrap_initiator_threads, "Number of threads dedicated to concurrent bootstrap attempts. Defaults to 1.\nWarning: a larger amount of attempts may use additional system memory and disk IO.\ntype:uint64");
toml.put ("bootstrap_frontier_request_count", bootstrap_frontier_request_count, "Number frontiers per bootstrap frontier request. Defaults to 1048576.\ntype:uint32,[1024..4294967295]");
toml.put ("lmdb_max_dbs", deprecated_lmdb_max_dbs, "DEPRECATED: use node.lmdb.max_databases instead.\nMaximum open lmdb databases. Increase default if more than 100 wallets is required.\nNote: external management is recommended when a large number of wallets is required (see https://docs.nano.org/integration-guides/key-management/).\ntype:uint64");
toml.put ("block_processor_batch_max_time", block_processor_batch_max_time.count (), "The maximum time the block processor can continuously process blocks for.\ntype:milliseconds");
toml.put ("allow_local_peers", allow_local_peers, "Enable or disable local host peering.\ntype:bool");
@ -318,6 +319,7 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
toml.get<unsigned> ("bootstrap_connections", bootstrap_connections);
toml.get<unsigned> ("bootstrap_connections_max", bootstrap_connections_max);
toml.get<unsigned> ("bootstrap_initiator_threads", bootstrap_initiator_threads);
toml.get<uint32_t> ("bootstrap_frontier_request_count", bootstrap_frontier_request_count);
toml.get<bool> ("enable_voting", enable_voting);
toml.get<bool> ("allow_local_peers", allow_local_peers);
toml.get<unsigned> (signature_checker_threads_key, signature_checker_threads);
@ -447,6 +449,10 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
{
toml.get_error ().set ("confirm_req_batches_max must be between 1 and 100");
}
if (bootstrap_frontier_request_count < 1024)
{
toml.get_error ().set ("bootstrap_frontier_request_count must be greater than or equal to 1024");
}
}
catch (std::runtime_error const & ex)
{

View file

@ -66,6 +66,7 @@ public:
unsigned bootstrap_connections{ 4 };
unsigned bootstrap_connections_max{ 64 };
unsigned bootstrap_initiator_threads{ 1 };
uint32_t bootstrap_frontier_request_count{ 1024 * 1024 };
nano::websocket::config websocket_config;
nano::diagnostics_config diagnostics_config;
size_t confirmation_history_size{ 2048 };

View file

@ -175,7 +175,7 @@ nano::bootstrap_constants::bootstrap_constants (nano::network_constants & networ
lazy_retry_limit = network_constants.is_dev_network () ? 2 : frontier_retry_limit * 4;
lazy_destinations_retry_limit = network_constants.is_dev_network () ? 1 : frontier_retry_limit / 4;
gap_cache_bootstrap_start_interval = network_constants.is_dev_network () ? std::chrono::milliseconds (5) : std::chrono::milliseconds (30 * 1000);
default_frontiers_age_seconds = network_constants.is_dev_network () ? 1 : network_constants.is_beta_network () ? 60 * 60 : 24 * 60 * 60; // 1 second for dev network, 1 hour for beta, 24 hours for live
default_frontiers_age_seconds = network_constants.is_dev_network () ? 1 : 24 * 60 * 60; // 1 second for dev network, 24 hours for live/beta
}
// Create a new random keypair