Ascending bootstrap dependency resolution (#4692)

* Dependency walking

* Parallel database scan

* Stats

* Throttling & backoff

* Dependency account scanning

* Sync dependencies

* Track source when throttling

* Count tags

* Priorities backoff

Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com>

* Timestamps

* Avoid reprocessing old blocks

Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com>

* Max tags limit

* Handle `gap_previous`

Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com>

* Check timestamp

* Tune initial priority

Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com>

* Fix config

* Verify response

* Use filters

* Remove random sampling

Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com>

* Backoff adjustments

* Insert genesis on start

Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com>

* Fix timestamp check performance

Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com>

* Adjust throttle size computation

* Config improvements

* Fix compilation

* Extend test timeout

---------

Co-authored-by: gr0vity-dev <85646666+gr0vity-dev@users.noreply.github.com>
This commit is contained in:
Piotr Wójcik 2024-08-06 17:44:24 +02:00 committed by GitHub
commit b7ae57a5ee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 977 additions and 415 deletions

View file

@ -30,7 +30,8 @@ TEST (account_sets, construction)
nano::test::system system;
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
}
TEST (account_sets, empty_blocked)
@ -40,7 +41,8 @@ TEST (account_sets, empty_blocked)
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
ASSERT_FALSE (sets.blocked (account));
}
@ -51,7 +53,8 @@ TEST (account_sets, block)
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
sets.block (account, random_hash ());
ASSERT_TRUE (sets.blocked (account));
}
@ -63,7 +66,8 @@ TEST (account_sets, unblock)
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
auto hash = random_hash ();
sets.block (account, hash);
sets.unblock (account, hash);
@ -77,8 +81,9 @@ TEST (account_sets, priority_base)
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
ASSERT_EQ (1.0f, sets.priority (account));
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
ASSERT_EQ (0.0, sets.priority (account));
}
TEST (account_sets, priority_blocked)
@ -88,9 +93,10 @@ TEST (account_sets, priority_blocked)
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
sets.block (account, random_hash ());
ASSERT_EQ (0.0f, sets.priority (account));
ASSERT_EQ (0.0, sets.priority (account));
}
// When account is unblocked, check that it retains it former priority
@ -101,15 +107,16 @@ TEST (account_sets, priority_unblock_keep)
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
sets.priority_up (account);
sets.priority_up (account);
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase);
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial + nano::bootstrap_ascending::account_sets::priority_increase);
auto hash = random_hash ();
sets.block (account, hash);
ASSERT_EQ (0.0f, sets.priority (account));
ASSERT_EQ (0.0, sets.priority (account));
sets.unblock (account, hash);
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial * nano::bootstrap_ascending::account_sets::priority_increase);
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial + nano::bootstrap_ascending::account_sets::priority_increase);
}
TEST (account_sets, priority_up_down)
@ -119,14 +126,14 @@ TEST (account_sets, priority_up_down)
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
sets.priority_up (account);
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial);
sets.priority_down (account);
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial - nano::bootstrap_ascending::account_sets::priority_decrease);
ASSERT_EQ (sets.priority (account), nano::bootstrap_ascending::account_sets::priority_initial / nano::bootstrap_ascending::account_sets::priority_divide);
}
// Check that priority downward saturates to 1.0f
TEST (account_sets, priority_down_sat)
{
nano::test::system system;
@ -134,9 +141,10 @@ TEST (account_sets, priority_down_sat)
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
sets.priority_down (account);
ASSERT_EQ (1.0f, sets.priority (account));
ASSERT_EQ (0.0, sets.priority (account));
}
// Ensure priority value is bounded
@ -147,7 +155,8 @@ TEST (account_sets, saturate_priority)
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::bootstrap_ascending::account_sets sets{ system.stats };
nano::account_sets_config config;
nano::bootstrap_ascending::account_sets sets{ config, system.stats };
for (int n = 0; n < 1000; ++n)
{
sets.priority_up (account);
@ -257,32 +266,3 @@ TEST (bootstrap_ascending, trace_base)
// std::cerr << "node1: " << node1.network.endpoint () << std::endl;
ASSERT_TIMELY (10s, node1.block (receive1->hash ()) != nullptr);
}
TEST (bootstrap_ascending, config_serialization)
{
nano::bootstrap_ascending_config config1;
config1.requests_limit = 0x101;
config1.database_requests_limit = 0x102;
config1.pull_count = 0x103;
config1.request_timeout = 0x104ms;
config1.throttle_coefficient = 0x105;
config1.throttle_wait = 0x106ms;
config1.block_wait_count = 0x107;
nano::tomlconfig toml1;
ASSERT_FALSE (config1.serialize (toml1));
std::stringstream stream1;
toml1.write (stream1);
auto string = stream1.str ();
std::stringstream stream2{ string };
nano::tomlconfig toml2;
toml2.read (stream2);
nano::bootstrap_ascending_config config2;
ASSERT_FALSE (config2.deserialize (toml2));
ASSERT_EQ (config1.requests_limit, config2.requests_limit);
ASSERT_EQ (config1.database_requests_limit, config2.database_requests_limit);
ASSERT_EQ (config1.pull_count, config2.pull_count);
ASSERT_EQ (config1.request_timeout, config2.request_timeout);
ASSERT_EQ (config1.throttle_coefficient, config2.throttle_coefficient);
ASSERT_EQ (config1.throttle_wait, config2.throttle_wait);
ASSERT_EQ (config1.block_wait_count, config2.block_wait_count);
}

View file

@ -48,7 +48,7 @@ private:
/**
* Checks if both lists contain the same blocks, with `blocks_b` skipped by `skip` elements
*/
bool compare_blocks (std::vector<std::shared_ptr<nano::block>> blocks_a, std::vector<std::shared_ptr<nano::block>> blocks_b, int skip = 0)
bool compare_blocks (auto const & blocks_a, auto const & blocks_b, int skip = 0)
{
debug_assert (blocks_b.size () >= blocks_a.size () + skip);

View file

@ -291,7 +291,7 @@ TEST (rep_crawler, two_reps_one_node)
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
system.wallet (0)->insert_adhoc (second_rep.prv);
ASSERT_TIMELY_EQ (5s, node2.rep_crawler.representative_count (), 2);
ASSERT_TIMELY_EQ (15s, node2.rep_crawler.representative_count (), 2);
auto reps = node2.rep_crawler.representatives ();
ASSERT_EQ (2, reps.size ());

View file

@ -116,6 +116,8 @@ TEST (toml, daemon_config_deserialize_defaults)
std::stringstream ss;
ss << R"toml(
[node]
[node.bootstrap_ascending]
[node.bootstrap_server]
[node.block_processor]
[node.diagnostics.txn_tracking]
[node.httpcallback]
@ -128,7 +130,6 @@ TEST (toml, daemon_config_deserialize_defaults)
[node.websocket]
[node.lmdb]
[node.rocksdb]
[node.bootstrap_server]
[opencl]
[rpc]
[rpc.child_process]
@ -265,6 +266,18 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.vote_processor.threads, defaults.node.vote_processor.threads);
ASSERT_EQ (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size);
ASSERT_EQ (conf.node.bootstrap_ascending.enable, defaults.node.bootstrap_ascending.enable);
ASSERT_EQ (conf.node.bootstrap_ascending.enable_database_scan, defaults.node.bootstrap_ascending.enable_database_scan);
ASSERT_EQ (conf.node.bootstrap_ascending.enable_dependency_walker, defaults.node.bootstrap_ascending.enable_dependency_walker);
ASSERT_EQ (conf.node.bootstrap_ascending.requests_limit, defaults.node.bootstrap_ascending.requests_limit);
ASSERT_EQ (conf.node.bootstrap_ascending.database_rate_limit, defaults.node.bootstrap_ascending.database_rate_limit);
ASSERT_EQ (conf.node.bootstrap_ascending.pull_count, defaults.node.bootstrap_ascending.pull_count);
ASSERT_EQ (conf.node.bootstrap_ascending.request_timeout, defaults.node.bootstrap_ascending.request_timeout);
ASSERT_EQ (conf.node.bootstrap_ascending.throttle_coefficient, defaults.node.bootstrap_ascending.throttle_coefficient);
ASSERT_EQ (conf.node.bootstrap_ascending.throttle_wait, defaults.node.bootstrap_ascending.throttle_wait);
ASSERT_EQ (conf.node.bootstrap_ascending.block_wait_count, defaults.node.bootstrap_ascending.block_wait_count);
ASSERT_EQ (conf.node.bootstrap_ascending.max_requests, defaults.node.bootstrap_ascending.max_requests);
ASSERT_EQ (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_EQ (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);
ASSERT_EQ (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size);
@ -576,6 +589,19 @@ TEST (toml, daemon_config_deserialize_no_defaults)
threads = 999
batch_size = 999
[node.bootstrap_ascending]
enable = false
enable_database_scan = false
enable_dependency_walker = false
requests_limit = 999
database_rate_limit = 999
pull_count = 999
request_timeout = 999
throttle_coefficient = 999
throttle_wait = 999
block_wait_count = 999
max_requests = 999
[node.bootstrap_server]
max_queue = 999
threads = 999
@ -740,6 +766,18 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.vote_processor.threads, defaults.node.vote_processor.threads);
ASSERT_NE (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size);
ASSERT_NE (conf.node.bootstrap_ascending.enable, defaults.node.bootstrap_ascending.enable);
ASSERT_NE (conf.node.bootstrap_ascending.enable_database_scan, defaults.node.bootstrap_ascending.enable_database_scan);
ASSERT_NE (conf.node.bootstrap_ascending.enable_dependency_walker, defaults.node.bootstrap_ascending.enable_dependency_walker);
ASSERT_NE (conf.node.bootstrap_ascending.requests_limit, defaults.node.bootstrap_ascending.requests_limit);
ASSERT_NE (conf.node.bootstrap_ascending.database_rate_limit, defaults.node.bootstrap_ascending.database_rate_limit);
ASSERT_NE (conf.node.bootstrap_ascending.pull_count, defaults.node.bootstrap_ascending.pull_count);
ASSERT_NE (conf.node.bootstrap_ascending.request_timeout, defaults.node.bootstrap_ascending.request_timeout);
ASSERT_NE (conf.node.bootstrap_ascending.throttle_coefficient, defaults.node.bootstrap_ascending.throttle_coefficient);
ASSERT_NE (conf.node.bootstrap_ascending.throttle_wait, defaults.node.bootstrap_ascending.throttle_wait);
ASSERT_NE (conf.node.bootstrap_ascending.block_wait_count, defaults.node.bootstrap_ascending.block_wait_count);
ASSERT_NE (conf.node.bootstrap_ascending.max_requests, defaults.node.bootstrap_ascending.max_requests);
ASSERT_NE (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_NE (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);
ASSERT_NE (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size);

View file

@ -58,6 +58,13 @@ enum class type
blockprocessor_source,
blockprocessor_result,
blockprocessor_overfill,
bootstrap_ascending,
bootstrap_ascending_accounts,
bootstrap_ascending_verify,
bootstrap_ascending_process,
bootstrap_ascending_request,
bootstrap_ascending_reply,
bootstrap_ascending_next,
bootstrap_server,
bootstrap_server_request,
bootstrap_server_overfill,
@ -87,9 +94,6 @@ enum class type
message_processor_overfill,
message_processor_type,
bootstrap_ascending,
bootstrap_ascending_accounts,
_last // Must be the last enum
};
@ -129,6 +133,7 @@ enum class detail
unconfirmed,
cemented,
cooldown,
empty,
// processing queue
queue,
@ -421,6 +426,12 @@ enum class detail
track,
timeout,
nothing_new,
account_info_empty,
loop_database,
loop_dependencies,
duplicate_request,
invalid_response_type,
timestamp_reset,
// bootstrap ascending accounts
prioritize,
@ -428,20 +439,31 @@ enum class detail
block,
unblock,
unblock_failed,
dependency_update,
dependency_update_failed,
next_none,
next_priority,
next_database,
next_none,
next_blocking,
next_dependency,
blocking_insert,
blocking_erase_overflow,
priority_insert,
priority_erase_threshold,
priority_erase_block,
priority_erase_by_threshold,
priority_erase_by_blocking,
priority_erase_overflow,
deprioritize,
deprioritize_failed,
sync_dependencies,
request_blocks,
request_account_info,
// active
started_hinted,
started_optimistic,
// rep_crawler
channel_dead,
query_target_failed,
@ -489,6 +511,11 @@ enum class detail
activate_success,
cancel_lowest,
// query_type
blocks_by_hash,
blocks_by_account,
account_info_by_hash,
_last // Must be the last enum
};

View file

@ -147,7 +147,7 @@ public:
template <typename Duration>
tomlconfig & get_duration (std::string const & key, Duration & target)
{
uint64_t value;
uint64_t value = target.count ();
get (key, value);
target = Duration{ value };
return *this;

View file

@ -16,9 +16,10 @@
* block_processor::context
*/
nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a) :
nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a, callback_t callback_a) :
block{ std::move (block) },
source{ source_a }
source{ source_a },
callback{ std::move (callback_a) }
{
debug_assert (source != nano::block_source::unknown);
}
@ -121,7 +122,7 @@ std::size_t nano::block_processor::size (nano::block_source source) const
return queue.size ({ source });
}
bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source, std::shared_ptr<nano::transport::channel> const & channel)
bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source, std::shared_ptr<nano::transport::channel> const & channel, std::function<void (nano::block_status)> callback)
{
if (node.network_params.work.validate_entry (*block)) // true => error
{
@ -135,7 +136,7 @@ bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, blo
to_string (source),
channel ? channel->to_string () : "<unknown>"); // TODO: Lazy eval
return add_impl (context{ block, source }, channel);
return add_impl (context{ block, source, std::move (callback) }, channel);
}
std::optional<nano::block_status> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
@ -247,6 +248,10 @@ void nano::block_processor::run ()
// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
if (context.callback)
{
context.callback (result);
}
context.set_result (result);
}

View file

@ -67,15 +67,17 @@ class block_processor final
public: // Context
class context
{
public:
context (std::shared_ptr<nano::block> block, nano::block_source source);
std::shared_ptr<nano::block> const block;
nano::block_source const source;
std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () };
public:
using result_t = nano::block_status;
using callback_t = std::function<void (result_t)>;
context (std::shared_ptr<nano::block> block, nano::block_source source, callback_t callback = nullptr);
std::shared_ptr<nano::block> block;
nano::block_source source;
callback_t callback;
std::chrono::steady_clock::time_point arrival{ std::chrono::steady_clock::now () };
std::future<result_t> get_future ();
private:
@ -94,7 +96,7 @@ public:
std::size_t size () const;
std::size_t size (nano::block_source) const;
bool add (std::shared_ptr<nano::block> const &, nano::block_source = nano::block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
bool add (std::shared_ptr<nano::block> const &, nano::block_source = nano::block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr, std::function<void (nano::block_status)> callback = {});
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, nano::block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();

View file

@ -29,13 +29,18 @@ nano::error nano::account_sets_config::serialize (nano::tomlconfig & toml) const
*/
nano::error nano::bootstrap_ascending_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("enable", enable);
toml.get ("enable_database_scan", enable_database_scan);
toml.get ("enable_dependency_walker", enable_dependency_walker);
toml.get ("requests_limit", requests_limit);
toml.get ("database_requests_limit", database_requests_limit);
toml.get ("database_rate_limit", database_rate_limit);
toml.get ("pull_count", pull_count);
toml.get_duration ("timeout", request_timeout);
toml.get_duration ("request_timeout", request_timeout);
toml.get ("throttle_coefficient", throttle_coefficient);
toml.get_duration ("throttle_wait", throttle_wait);
toml.get ("block_wait_count", block_wait_count);
toml.get ("max_requests", max_requests);
if (toml.has_key ("account_sets"))
{
@ -48,13 +53,18 @@ nano::error nano::bootstrap_ascending_config::deserialize (nano::tomlconfig & to
nano::error nano::bootstrap_ascending_config::serialize (nano::tomlconfig & toml) const
{
toml.put ("enable", enable, "Enable or disable the ascending bootstrap. Disabling it is not recommended and will prevent the node from syncing.\ntype:bool");
toml.put ("enable_database_scan", enable_database_scan, "Enable or disable the 'database scan` strategy for the ascending bootstrap.\ntype:bool");
toml.put ("enable_dependency_walker", enable_dependency_walker, "Enable or disable the 'dependency walker` strategy for the ascending bootstrap.\ntype:bool");
toml.put ("requests_limit", requests_limit, "Request limit to ascending bootstrap after which requests will be dropped.\nNote: changing to unlimited (0) is not recommended.\ntype:uint64");
toml.put ("database_requests_limit", database_requests_limit, "Request limit for accounts from database after which requests will be dropped.\nNote: changing to unlimited (0) is not recommended as this operation competes for resources on querying the database.\ntype:uint64");
toml.put ("database_rate_limit", database_rate_limit, "Rate limit on scanning accounts and pending entries from database.\nNote: changing to unlimited (0) is not recommended as this operation competes for resources on querying the database.\ntype:uint64");
toml.put ("pull_count", pull_count, "Number of requested blocks for ascending bootstrap request.\ntype:uint64");
toml.put ("timeout", request_timeout.count (), "Timeout in milliseconds for incoming ascending bootstrap messages to be processed.\ntype:milliseconds");
toml.put ("request_timeout", request_timeout.count (), "Timeout in milliseconds for incoming ascending bootstrap messages to be processed.\ntype:milliseconds");
toml.put ("throttle_coefficient", throttle_coefficient, "Scales the number of samples to track for bootstrap throttling.\ntype:uint64");
toml.put ("throttle_wait", throttle_wait.count (), "Length of time to wait between requests when throttled.\ntype:milliseconds");
toml.put ("block_wait_count", block_wait_count, "Asending bootstrap will wait while block processor has more than this many blocks queued.\ntype:uint64");
toml.put ("max_requests", max_requests, "Maximum total number of in flight requests.\ntype:uint64");
nano::tomlconfig account_sets_l;
account_sets.serialize (account_sets_l);

View file

@ -8,32 +8,41 @@ namespace nano
{
class tomlconfig;
// TODO: This should be moved next to `account_sets` class
class account_sets_config final
{
public:
nano::error deserialize (nano::tomlconfig & toml);
nano::error serialize (nano::tomlconfig & toml) const;
public:
std::size_t consideration_count{ 4 };
std::size_t priorities_max{ 256 * 1024 };
std::size_t blocking_max{ 256 * 1024 };
std::chrono::milliseconds cooldown{ 1000 * 3 };
};
// TODO: This should be moved next to `bootstrap_ascending` class
class bootstrap_ascending_config final
{
public:
nano::error deserialize (nano::tomlconfig & toml);
nano::error serialize (nano::tomlconfig & toml) const;
public:
bool enable{ true };
bool enable_database_scan{ true };
bool enable_dependency_walker{ true };
// Maximum number of un-responded requests per channel
std::size_t requests_limit{ 64 };
std::size_t database_requests_limit{ 1024 };
std::size_t pull_count{ nano::bootstrap_server::max_blocks };
std::size_t requests_limit{ 64 }; // TODO: => channel_requests_limit
std::size_t database_rate_limit{ 1024 }; // TODO: Adjust for live network (lower)
std::size_t pull_count{ nano::bootstrap_server::max_blocks }; // TODO: => max_pull_count & use in requests
std::chrono::milliseconds request_timeout{ 1000 * 5 };
std::size_t throttle_coefficient{ 16 };
std::size_t throttle_coefficient{ 8 * 1024 };
std::chrono::milliseconds throttle_wait{ 100 };
std::size_t block_wait_count{ 1000 };
std::size_t block_wait_count{ 1000 }; // TODO: Block processor threshold
std::size_t max_requests{ 1024 * 16 }; // TODO: Adjust for live network
nano::account_sets_config account_sets;
};

View file

@ -313,11 +313,11 @@ nano::asc_pull_ack nano::bootstrap_server::prepare_empty_blocks_response (nano::
return response;
}
std::vector<std::shared_ptr<nano::block>> nano::bootstrap_server::prepare_blocks (secure::transaction const & transaction, nano::block_hash start_block, std::size_t count) const
std::deque<std::shared_ptr<nano::block>> nano::bootstrap_server::prepare_blocks (secure::transaction const & transaction, nano::block_hash start_block, std::size_t count) const
{
debug_assert (count <= max_blocks); // Should be filtered out earlier
std::vector<std::shared_ptr<nano::block>> result;
std::deque<std::shared_ptr<nano::block>> result;
if (!start_block.is_zero ())
{
std::shared_ptr<nano::block> current = ledger.any.block_get (transaction, start_block);

View file

@ -64,7 +64,7 @@ private:
nano::asc_pull_ack process (secure::transaction const &, nano::asc_pull_req::id_t id, nano::asc_pull_req::blocks_payload const & request) const;
nano::asc_pull_ack prepare_response (secure::transaction const &, nano::asc_pull_req::id_t id, nano::block_hash start_block, std::size_t count) const;
nano::asc_pull_ack prepare_empty_blocks_response (nano::asc_pull_req::id_t id) const;
std::vector<std::shared_ptr<nano::block>> prepare_blocks (secure::transaction const &, nano::block_hash start_block, std::size_t count) const;
std::deque<std::shared_ptr<nano::block>> prepare_blocks (secure::transaction const &, nano::block_hash start_block, std::size_t count) const;
/*
* Account info request

View file

@ -11,14 +11,19 @@
* account_sets
*/
nano::bootstrap_ascending::account_sets::account_sets (nano::stats & stats_a, nano::account_sets_config config_a) :
stats{ stats_a },
config{ std::move (config_a) }
nano::bootstrap_ascending::account_sets::account_sets (nano::account_sets_config const & config_a, nano::stats & stats_a) :
config{ config_a },
stats{ stats_a }
{
}
void nano::bootstrap_ascending::account_sets::priority_up (nano::account const & account)
{
if (account.is_zero ())
{
return;
}
if (!blocked (account))
{
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize);
@ -27,14 +32,13 @@ void nano::bootstrap_ascending::account_sets::priority_up (nano::account const &
if (iter != priorities.get<tag_account> ().end ())
{
priorities.get<tag_account> ().modify (iter, [] (auto & val) {
val.priority = std::min ((val.priority * account_sets::priority_increase), account_sets::priority_max);
val.priority = std::min ((val.priority + account_sets::priority_increase), account_sets::priority_max);
});
}
else
{
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_insert);
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
trim_overflow ();
}
}
@ -46,16 +50,21 @@ void nano::bootstrap_ascending::account_sets::priority_up (nano::account const &
void nano::bootstrap_ascending::account_sets::priority_down (nano::account const & account)
{
if (account.is_zero ())
{
return;
}
auto iter = priorities.get<tag_account> ().find (account);
if (iter != priorities.get<tag_account> ().end ())
{
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::deprioritize);
auto priority_new = iter->priority - account_sets::priority_decrease;
auto priority_new = iter->priority / account_sets::priority_divide;
if (priority_new <= account_sets::priority_cutoff)
{
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_by_threshold);
priorities.get<tag_account> ().erase (iter);
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_threshold);
}
else
{
@ -70,17 +79,42 @@ void nano::bootstrap_ascending::account_sets::priority_down (nano::account const
}
}
void nano::bootstrap_ascending::account_sets::priority_set (nano::account const & account)
{
if (account.is_zero ())
{
return;
}
if (!blocked (account))
{
auto iter = priorities.get<tag_account> ().find (account);
if (iter == priorities.get<tag_account> ().end ())
{
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_insert);
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
trim_overflow ();
}
}
else
{
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::prioritize_failed);
}
}
void nano::bootstrap_ascending::account_sets::block (nano::account const & account, nano::block_hash const & dependency)
{
debug_assert (!account.is_zero ());
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::block);
auto existing = priorities.get<tag_account> ().find (account);
auto entry = existing == priorities.get<tag_account> ().end () ? priority_entry{ 0, 0 } : *existing;
auto entry = (existing == priorities.get<tag_account> ().end ()) ? priority_entry{ account, 0 } : *existing;
priorities.get<tag_account> ().erase (account);
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_block);
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_by_blocking);
blocking.get<tag_account> ().insert ({ account, dependency, entry });
blocking.get<tag_account> ().insert ({ entry, dependency });
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_insert);
trim_overflow ();
@ -88,6 +122,11 @@ void nano::bootstrap_ascending::account_sets::block (nano::account const & accou
void nano::bootstrap_ascending::account_sets::unblock (nano::account const & account, std::optional<nano::block_hash> const & hash)
{
if (account.is_zero ())
{
return;
}
// Unblock only if the dependency is fulfilled
auto existing = blocking.get<tag_account> ().find (account);
if (existing != blocking.get<tag_account> ().end () && (!hash || existing->dependency == *hash))
@ -116,6 +155,8 @@ void nano::bootstrap_ascending::account_sets::unblock (nano::account const & acc
void nano::bootstrap_ascending::account_sets::timestamp_set (const nano::account & account)
{
debug_assert (!account.is_zero ());
auto iter = priorities.get<tag_account> ().find (account);
if (iter != priorities.get<tag_account> ().end ())
{
@ -127,6 +168,8 @@ void nano::bootstrap_ascending::account_sets::timestamp_set (const nano::account
void nano::bootstrap_ascending::account_sets::timestamp_reset (const nano::account & account)
{
debug_assert (!account.is_zero ());
auto iter = priorities.get<tag_account> ().find (account);
if (iter != priorities.get<tag_account> ().end ())
{
@ -136,84 +179,131 @@ void nano::bootstrap_ascending::account_sets::timestamp_reset (const nano::accou
}
}
// Returns false if the account is busy
bool nano::bootstrap_ascending::account_sets::check_timestamp (const nano::account & account) const
void nano::bootstrap_ascending::account_sets::dependency_update (nano::block_hash const & hash, nano::account const & dependency_account)
{
auto iter = priorities.get<tag_account> ().find (account);
if (iter != priorities.get<tag_account> ().end ())
debug_assert (!dependency_account.is_zero ());
auto [it, end] = blocking.get<tag_dependency> ().equal_range (hash);
if (it != end)
{
auto const cutoff = std::chrono::steady_clock::now () - config.cooldown;
if (iter->timestamp > cutoff)
while (it != end)
{
return false;
if (it->dependency_account != dependency_account)
{
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::dependency_update);
blocking.get<tag_dependency> ().modify (it++, [dependency_account] (auto & entry) {
entry.dependency_account = dependency_account;
});
}
else
{
++it;
}
}
}
return true;
else
{
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::dependency_update_failed);
}
}
void nano::bootstrap_ascending::account_sets::trim_overflow ()
{
if (priorities.size () > config.priorities_max)
while (priorities.size () > config.priorities_max)
{
// Evict the lowest priority entry
priorities.get<tag_priority> ().erase (priorities.get<tag_priority> ().begin ());
// Erase the oldest entry
priorities.pop_front ();
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::priority_erase_overflow);
}
if (blocking.size () > config.blocking_max)
while (blocking.size () > config.blocking_max)
{
// Evict the lowest priority entry
blocking.get<tag_priority> ().erase (blocking.get<tag_priority> ().begin ());
// Erase the oldest entry
blocking.pop_front ();
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::blocking_erase_overflow);
}
}
nano::account nano::bootstrap_ascending::account_sets::next ()
nano::account nano::bootstrap_ascending::account_sets::next_priority (std::function<bool (nano::account const &)> const & filter)
{
if (priorities.empty ())
{
return { 0 };
}
std::vector<float> weights;
std::vector<nano::account> candidates;
auto const cutoff = std::chrono::steady_clock::now () - config.cooldown;
int iterations = 0;
while (candidates.size () < config.consideration_count && iterations++ < config.consideration_count * 10)
for (auto const & entry : priorities.get<tag_priority> ())
{
debug_assert (candidates.size () == weights.size ());
// Use a dedicated, uniformly distributed field for sampling to avoid problematic corner case when accounts in the queue are very close together
auto search = nano::bootstrap_ascending::generate_id ();
auto iter = priorities.get<tag_id> ().lower_bound (search);
if (iter == priorities.get<tag_id> ().end ())
if (entry.timestamp > cutoff)
{
iter = priorities.get<tag_id> ().begin ();
continue;
}
if (!filter (entry.account))
{
continue;
}
return entry.account;
}
return { 0 };
}
nano::block_hash nano::bootstrap_ascending::account_sets::next_blocking (std::function<bool (nano::block_hash const &)> const & filter)
{
if (blocking.empty ())
{
return { 0 };
}
// Scan all entries with unknown dependency account
auto [begin, end] = blocking.get<tag_dependency_account> ().equal_range (nano::account{ 0 });
for (auto const & entry : boost::make_iterator_range (begin, end))
{
debug_assert (entry.dependency_account.is_zero ());
if (!filter (entry.dependency))
{
continue;
}
return entry.dependency;
}
return { 0 };
}
void nano::bootstrap_ascending::account_sets::sync_dependencies ()
{
// Sample all accounts with a known dependency account (> account 0)
auto begin = blocking.get<tag_dependency_account> ().upper_bound (nano::account{ 0 });
auto end = blocking.get<tag_dependency_account> ().end ();
for (auto const & entry : boost::make_iterator_range (begin, end))
{
debug_assert (!entry.dependency_account.is_zero ());
if (priorities.size () >= config.priorities_max)
{
break;
}
if (check_timestamp (iter->account))
if (!blocked (entry.dependency_account) && !prioritized (entry.dependency_account))
{
candidates.push_back (iter->account);
weights.push_back (iter->priority);
stats.inc (nano::stat::type::bootstrap_ascending_accounts, nano::stat::detail::sync_dependencies);
priority_set (entry.dependency_account);
}
}
if (candidates.empty ())
{
return { 0 }; // All sampled accounts are busy
}
std::discrete_distribution dist{ weights.begin (), weights.end () };
auto selection = dist (rng);
debug_assert (!weights.empty () && selection < weights.size ());
auto result = candidates[selection];
return result;
trim_overflow ();
}
bool nano::bootstrap_ascending::account_sets::blocked (nano::account const & account) const
{
return blocking.get<tag_account> ().count (account) > 0;
return blocking.get<tag_account> ().contains (account);
}
bool nano::bootstrap_ascending::account_sets::prioritized (nano::account const & account) const
{
return priorities.get<tag_account> ().contains (account);
}
std::size_t nano::bootstrap_ascending::account_sets::priority_size () const
@ -226,18 +316,17 @@ std::size_t nano::bootstrap_ascending::account_sets::blocked_size () const
return blocking.size ();
}
float nano::bootstrap_ascending::account_sets::priority (nano::account const & account) const
double nano::bootstrap_ascending::account_sets::priority (nano::account const & account) const
{
if (blocked (account))
if (!blocked (account))
{
return 0.0f;
auto existing = priorities.get<tag_account> ().find (account);
if (existing != priorities.get<tag_account> ().end ())
{
return existing->priority;
}
}
auto existing = priorities.get<tag_account> ().find (account);
if (existing != priorities.get<tag_account> ().end ())
{
return existing->priority;
}
return account_sets::priority_cutoff;
return 0.0;
}
auto nano::bootstrap_ascending::account_sets::info () const -> nano::bootstrap_ascending::account_sets::info_t
@ -247,8 +336,12 @@ auto nano::bootstrap_ascending::account_sets::info () const -> nano::bootstrap_a
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::account_sets::collect_container_info (const std::string & name)
{
// Count blocking entries with their dependency account unknown
auto blocking_unknown = blocking.get<tag_dependency_account> ().count (nano::account{ 0 });
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priorities", priorities.size (), sizeof (decltype (priorities)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocking", blocking.size (), sizeof (decltype (blocking)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocking_unknown", blocking_unknown, 0 }));
return composite;
}

View file

@ -26,7 +26,7 @@ namespace bootstrap_ascending
class account_sets
{
public:
explicit account_sets (nano::stats &, nano::account_sets_config config = {});
account_sets (account_sets_config const &, nano::stats &);
/**
* If an account is not blocked, increase its priority.
@ -39,38 +39,53 @@ namespace bootstrap_ascending
* Current implementation divides priority by 2.0f and saturates down to 1.0f.
*/
void priority_down (nano::account const & account);
void priority_set (nano::account const & account);
void block (nano::account const & account, nano::block_hash const & dependency);
void unblock (nano::account const & account, std::optional<nano::block_hash> const & hash = std::nullopt);
void timestamp_set (nano::account const & account);
void timestamp_reset (nano::account const & account);
nano::account next ();
/**
* Sets information about the account chain that contains the block hash
*/
void dependency_update (nano::block_hash const & hash, nano::account const & dependency_account);
/**
* Should be called periodically to reinsert missing dependencies into the priority set
*/
void sync_dependencies ();
/**
* Sampling
*/
nano::account next_priority (std::function<bool (nano::account const &)> const & filter);
nano::block_hash next_blocking (std::function<bool (nano::block_hash const &)> const & filter);
public:
bool blocked (nano::account const & account) const;
bool prioritized (nano::account const & account) const;
// Accounts in the ledger but not in priority list are assumed priority 1.0f
// Blocked accounts are assumed priority 0.0f
double priority (nano::account const & account) const;
std::size_t priority_size () const;
std::size_t blocked_size () const;
/**
* Accounts in the ledger but not in priority list are assumed priority 1.0f
* Blocked accounts are assumed priority 0.0f
*/
float priority (nano::account const & account) const;
public: // Container info
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
private: // Dependencies
account_sets_config const & config;
nano::stats & stats;
private:
void trim_overflow ();
bool check_timestamp (nano::account const & account) const;
private: // Dependencies
nano::stats & stats;
private:
struct priority_entry
{
nano::account account;
float priority;
double priority;
id_t id{ generate_id () }; // Uniformly distributed, used for random querying
std::chrono::steady_clock::time_point timestamp{};
@ -78,33 +93,40 @@ namespace bootstrap_ascending
struct blocking_entry
{
nano::account account{ 0 };
nano::block_hash dependency{ 0 };
priority_entry original_entry{ 0, 0 };
priority_entry original_entry;
nano::block_hash dependency;
nano::account dependency_account{ 0 };
float priority () const
id_t id{ generate_id () }; // Uniformly distributed, used for random querying
nano::account account () const
{
return original_entry.account;
}
double priority () const
{
return original_entry.priority;
}
};
// clang-format off
class tag_account {};
class tag_priority {};
class tag_sequenced {};
class tag_account {};
class tag_id {};
class tag_dependency {};
class tag_dependency_account {};
class tag_priority {};
// Tracks the ongoing account priorities
// This only stores account priorities > 1.0f.
using ordered_priorities = boost::multi_index_container<priority_entry,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::ordered_unique<mi::tag<tag_account>,
mi::member<priority_entry, nano::account, &priority_entry::account>>,
mi::ordered_non_unique<mi::tag<tag_priority>,
mi::member<priority_entry, float, &priority_entry::priority>>,
mi::member<priority_entry, double, &priority_entry::priority>, std::greater<>>, // Descending
mi::ordered_unique<mi::tag<tag_id>,
mi::member<priority_entry, nano::bootstrap_ascending::id_t, &priority_entry::id>>
mi::member<priority_entry, id_t, &priority_entry::id>>
>>;
// A blocked account is an account that has failed to insert a new block because the source block is not currently present in the ledger
@ -113,30 +135,29 @@ namespace bootstrap_ascending
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::ordered_unique<mi::tag<tag_account>,
mi::member<blocking_entry, nano::account, &blocking_entry::account>>,
mi::ordered_non_unique<mi::tag<tag_priority>,
mi::const_mem_fun<blocking_entry, float, &blocking_entry::priority>>
mi::const_mem_fun<blocking_entry, nano::account, &blocking_entry::account>>,
mi::ordered_non_unique<mi::tag<tag_dependency>,
mi::member<blocking_entry, nano::block_hash, &blocking_entry::dependency>>,
mi::ordered_non_unique<mi::tag<tag_dependency_account>,
mi::member<blocking_entry, nano::account, &blocking_entry::dependency_account>>,
mi::ordered_unique<mi::tag<tag_id>,
mi::member<blocking_entry, id_t, &blocking_entry::id>>
>>;
// clang-format on
ordered_priorities priorities;
ordered_blocking blocking;
std::default_random_engine rng;
private:
nano::account_sets_config config;
public: // Consts
static float constexpr priority_initial = 8.0f;
static float constexpr priority_increase = 2.0f;
static float constexpr priority_decrease = 0.5f;
static float constexpr priority_max = 32.0f;
static float constexpr priority_cutoff = 1.0f;
public: // Constants
static double constexpr priority_initial = 2.0;
static double constexpr priority_increase = 2.0;
static double constexpr priority_divide = 2.0;
static double constexpr priority_max = 128.0;
static double constexpr priority_cutoff = 0.15;
public:
using info_t = std::tuple<decltype (blocking), decltype (priorities)>; // <blocking, priorities>
info_t info () const;
};
} // bootstrap_ascending
} // nano
}
}

View file

@ -71,18 +71,25 @@ nano::account nano::bootstrap_ascending::buffered_iterator::operator* () const
return !buffer.empty () ? buffer.front () : nano::account{ 0 };
}
nano::account nano::bootstrap_ascending::buffered_iterator::next ()
nano::account nano::bootstrap_ascending::buffered_iterator::next (std::function<bool (nano::account const &)> const & filter)
{
if (!buffer.empty ())
{
buffer.pop_front ();
}
else
if (buffer.empty ())
{
fill ();
}
return *(*this);
while (!buffer.empty ())
{
auto result = buffer.front ();
buffer.pop_front ();
if (filter (result))
{
return result;
}
}
return { 0 };
}
bool nano::bootstrap_ascending::buffered_iterator::warmup () const

View file

@ -39,8 +39,10 @@ class buffered_iterator
{
public:
explicit buffered_iterator (nano::ledger & ledger);
nano::account operator* () const;
nano::account next ();
nano::account next (std::function<bool (nano::account const &)> const & filter);
// Indicates if a full ledger iteration has taken place e.g. warmed up
bool warmup () const;

View file

@ -6,9 +6,9 @@
* peer_scoring
*/
nano::bootstrap_ascending::peer_scoring::peer_scoring (nano::bootstrap_ascending_config & config, nano::network_constants const & network_constants) :
network_constants{ network_constants },
config{ config }
nano::bootstrap_ascending::peer_scoring::peer_scoring (bootstrap_ascending_config const & config_a, nano::network_constants const & network_constants_a) :
config{ config_a },
network_constants{ network_constants_a }
{
}

View file

@ -24,7 +24,8 @@ namespace bootstrap_ascending
class peer_scoring
{
public:
peer_scoring (nano::bootstrap_ascending_config & config, nano::network_constants const & network_constants);
peer_scoring (bootstrap_ascending_config const &, nano::network_constants const &);
// Returns true if channel limit has been exceeded
bool try_send_message (std::shared_ptr<nano::transport::channel> channel);
void received_message (std::shared_ptr<nano::transport::channel> channel);
@ -35,6 +36,10 @@ namespace bootstrap_ascending
void timeout ();
void sync (std::deque<std::shared_ptr<nano::transport::channel>> const & list);
private:
bootstrap_ascending_config const & config;
nano::network_constants const & network_constants;
private:
class peer_score
{
@ -63,8 +68,6 @@ namespace bootstrap_ascending
uint64_t request_count_total{ 0 };
uint64_t response_count_total{ 0 };
};
nano::network_constants const & network_constants;
nano::bootstrap_ascending_config & config;
// clang-format off
// Indexes scores by their shared channel pointer

View file

@ -1,4 +1,5 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/enum_util.hpp>
#include <nano/lib/stats_enums.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/node/blockprocessor.hpp>
@ -18,18 +19,19 @@ using namespace std::chrono_literals;
* bootstrap_ascending
*/
nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a) :
config{ config_a },
network_consts{ config.network_params.network },
nano::bootstrap_ascending::service::service (nano::node_config const & node_config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a, nano::logger & logger_a) :
config{ node_config_a.bootstrap_ascending },
network_constants{ node_config_a.network_params.network },
block_processor{ block_processor_a },
ledger{ ledger_a },
network{ network_a },
stats{ stat_a },
accounts{ stats },
logger{ logger_a },
accounts{ config.account_sets, stats },
iterator{ ledger },
throttle{ compute_throttle_size () },
scoring{ config.bootstrap_ascending, config.network_params.network },
database_limiter{ config.bootstrap_ascending.database_requests_limit, 1.0 }
scoring{ config, node_config_a.network_params.network },
database_limiter{ config.database_rate_limit, 1.0 }
{
// TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread
block_processor.batch_processed.add ([this] (auto const & batch) {
@ -43,28 +45,55 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano:
inspect (transaction, result, *context.block);
}
}
condition.notify_all ();
});
accounts.priority_set (node_config_a.network_params.ledger.genesis->account_field ().value ());
}
nano::bootstrap_ascending::service::~service ()
{
// All threads must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!priorities_thread.joinable ());
debug_assert (!database_thread.joinable ());
debug_assert (!dependencies_thread.joinable ());
debug_assert (!timeout_thread.joinable ());
}
void nano::bootstrap_ascending::service::start ()
{
debug_assert (!thread.joinable ());
debug_assert (!priorities_thread.joinable ());
debug_assert (!database_thread.joinable ());
debug_assert (!dependencies_thread.joinable ());
debug_assert (!timeout_thread.joinable ());
thread = std::thread ([this] () {
if (!config.enable)
{
logger.warn (nano::log::type::bootstrap, "Ascending bootstrap is disabled");
return;
}
priorities_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run ();
run_priorities ();
});
if (config.enable_database_scan)
{
database_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run_database ();
});
}
if (config.enable_dependency_walker)
{
dependencies_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run_dependencies ();
});
}
timeout_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run_timeouts ();
@ -78,27 +107,59 @@ void nano::bootstrap_ascending::service::stop ()
stopped = true;
}
condition.notify_all ();
nano::join_or_pass (thread);
nano::join_or_pass (priorities_thread);
nano::join_or_pass (database_thread);
nano::join_or_pass (dependencies_thread);
nano::join_or_pass (timeout_thread);
}
void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::channel> channel, async_tag tag)
void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::channel> const & channel, async_tag tag)
{
debug_assert (tag.type == async_tag::query_type::blocks_by_hash || tag.type == async_tag::query_type::blocks_by_account);
debug_assert (tag.type != query_type::invalid);
debug_assert (tag.source != query_source::invalid);
nano::asc_pull_req request{ network_consts };
{
nano::lock_guard<nano::mutex> lock{ mutex };
debug_assert (tags.get<tag_id> ().count (tag.id) == 0);
tags.get<tag_id> ().insert (tag);
}
nano::asc_pull_req request{ network_constants };
request.id = tag.id;
request.type = nano::asc_pull_type::blocks;
nano::asc_pull_req::blocks_payload request_payload;
request_payload.start = tag.start;
request_payload.count = config.bootstrap_ascending.pull_count;
request_payload.start_type = (tag.type == async_tag::query_type::blocks_by_hash) ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account;
switch (tag.type)
{
case query_type::blocks_by_hash:
case query_type::blocks_by_account:
{
request.type = nano::asc_pull_type::blocks;
nano::asc_pull_req::blocks_payload pld;
pld.start = tag.start;
pld.count = tag.count;
pld.start_type = tag.type == query_type::blocks_by_hash ? nano::asc_pull_req::hash_type::block : nano::asc_pull_req::hash_type::account;
request.payload = pld;
}
break;
case query_type::account_info_by_hash:
{
request.type = nano::asc_pull_type::account_info;
nano::asc_pull_req::account_info_payload pld;
pld.target_type = nano::asc_pull_req::hash_type::block; // Query account info by block hash
pld.target = tag.start;
request.payload = pld;
}
break;
default:
debug_assert (false);
}
request.payload = request_payload;
request.update_header ();
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::request, nano::stat::dir::out);
stats.inc (nano::stat::type::bootstrap_ascending_request, to_stat_detail (tag.type));
// TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests
channel->send (
@ -130,6 +191,8 @@ std::size_t nano::bootstrap_ascending::service::score_size () const
*/
void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx, nano::block_status const & result, nano::block const & block)
{
debug_assert (!mutex.try_lock ());
auto const hash = block.hash ();
switch (result)
@ -141,13 +204,12 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx
// If we've inserted any block in to an account, unmark it as blocked
accounts.unblock (account);
accounts.priority_up (account);
accounts.timestamp_reset (account);
if (block.is_send ())
{
auto destination = block.destination ();
accounts.unblock (destination, hash); // Unblocking automatically inserts account into priority set
accounts.priority_up (destination);
accounts.priority_set (destination);
}
}
break;
@ -158,18 +220,15 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx
// Mark account as blocked because it is missing the source block
accounts.block (account, source);
// TODO: Track stats
}
break;
case nano::block_status::old:
{
// TODO: Track stats
}
break;
case nano::block_status::gap_previous:
{
// TODO: Track stats
if (block.type () == block_type::state)
{
const auto account = block.account_field ().value ();
accounts.priority_set (account);
}
}
break;
default: // No need to handle other cases
@ -177,140 +236,336 @@ void nano::bootstrap_ascending::service::inspect (secure::transaction const & tx
}
}
void nano::bootstrap_ascending::service::wait_blockprocessor ()
void nano::bootstrap_ascending::service::wait (std::function<bool ()> const & predicate) const
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && block_processor.size (nano::block_source::bootstrap) > config.bootstrap_ascending.block_wait_count)
std::unique_lock<nano::mutex> lock{ mutex };
std::chrono::milliseconds interval = 5ms;
while (!stopped && !predicate ())
{
condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions
condition.wait_for (lock, interval);
interval = std::min (interval * 2, config.throttle_wait);
}
}
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::service::wait_available_channel ()
void nano::bootstrap_ascending::service::wait_tags ()
{
wait ([this] () {
debug_assert (!mutex.try_lock ());
return tags.size () < config.max_requests;
});
}
void nano::bootstrap_ascending::service::wait_blockprocessor ()
{
wait ([this] () {
return block_processor.size (nano::block_source::bootstrap) < config.block_wait_count;
});
}
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::service::wait_channel ()
{
std::shared_ptr<nano::transport::channel> channel;
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && !(channel = scoring.channel ()))
{
condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; });
}
wait ([this, &channel] () {
debug_assert (!mutex.try_lock ());
channel = scoring.channel ();
return channel != nullptr; // Wait until a channel is available
});
return channel;
}
nano::account nano::bootstrap_ascending::service::available_account ()
size_t nano::bootstrap_ascending::service::count_tags (nano::account const & account, query_source source) const
{
{
auto account = accounts.next ();
if (!account.is_zero ())
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_priority);
return account;
}
}
if (database_limiter.should_pass (1))
{
auto account = iterator.next ();
if (!account.is_zero ())
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_database);
return account;
}
}
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::next_none);
return { 0 };
debug_assert (!mutex.try_lock ());
auto [begin, end] = tags.get<tag_account> ().equal_range (account);
return std::count_if (begin, end, [source] (auto const & tag) { return tag.source == source; });
}
nano::account nano::bootstrap_ascending::service::wait_available_account ()
size_t nano::bootstrap_ascending::service::count_tags (nano::block_hash const & hash, query_source source) const
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
auto account = available_account ();
if (!account.is_zero ())
{
accounts.timestamp_set (account);
return account;
}
else
{
condition.wait_for (lock, 100ms);
}
}
return { 0 };
debug_assert (!mutex.try_lock ());
auto [begin, end] = tags.get<tag_hash> ().equal_range (hash);
return std::count_if (begin, end, [source] (auto const & tag) { return tag.source == source; });
}
bool nano::bootstrap_ascending::service::request (nano::account & account, std::shared_ptr<nano::transport::channel> & channel)
std::pair<nano::account, double> nano::bootstrap_ascending::service::next_priority ()
{
debug_assert (!mutex.try_lock ());
auto account = accounts.next_priority ([this] (nano::account const & account) {
return count_tags (account, query_source::priority) < 4;
});
if (account.is_zero ())
{
return {};
}
stats.inc (nano::stat::type::bootstrap_ascending_next, nano::stat::detail::next_priority);
accounts.timestamp_set (account);
// TODO: Priority could be returned by the accounts.next_priority() call
return { account, accounts.priority (account) };
}
std::pair<nano::account, double> nano::bootstrap_ascending::service::wait_priority ()
{
std::pair<nano::account, double> result{ 0, 0 };
wait ([this, &result] () {
debug_assert (!mutex.try_lock ());
result = next_priority ();
if (!result.first.is_zero ())
{
return true;
}
return false;
});
return result;
}
nano::account nano::bootstrap_ascending::service::next_database (bool should_throttle)
{
debug_assert (!mutex.try_lock ());
// Throttling increases the weight of database requests
// TODO: Make this ratio configurable
if (!database_limiter.should_pass (should_throttle ? 22 : 1))
{
return { 0 };
}
auto account = iterator.next ([this] (nano::account const & account) {
return count_tags (account, query_source::database) == 0;
});
if (account.is_zero ())
{
return { 0 };
}
stats.inc (nano::stat::type::bootstrap_ascending_next, nano::stat::detail::next_database);
return account;
}
nano::account nano::bootstrap_ascending::service::wait_database (bool should_throttle)
{
nano::account result{ 0 };
wait ([this, &result, should_throttle] () {
debug_assert (!mutex.try_lock ());
result = next_database (should_throttle);
if (!result.is_zero ())
{
return true;
}
return false;
});
return result;
}
nano::block_hash nano::bootstrap_ascending::service::next_blocking ()
{
debug_assert (!mutex.try_lock ());
auto blocking = accounts.next_blocking ([this] (nano::block_hash const & hash) {
return count_tags (hash, query_source::blocking) == 0;
});
if (blocking.is_zero ())
{
return { 0 };
}
stats.inc (nano::stat::type::bootstrap_ascending_next, nano::stat::detail::next_blocking);
return blocking;
}
nano::block_hash nano::bootstrap_ascending::service::wait_blocking ()
{
nano::block_hash result{ 0 };
wait ([this, &result] () {
debug_assert (!mutex.try_lock ());
result = next_blocking ();
if (!result.is_zero ())
{
return true;
}
return false;
});
return result;
}
bool nano::bootstrap_ascending::service::request (nano::account account, size_t count, std::shared_ptr<nano::transport::channel> const & channel, query_source source)
{
debug_assert (count > 0);
debug_assert (count <= nano::bootstrap_server::max_blocks);
async_tag tag{};
tag.id = nano::bootstrap_ascending::generate_id ();
tag.source = source;
tag.account = account;
tag.count = count;
// Check if the account picked has blocks, if it does, start the pull from the highest block
auto info = ledger.store.account.get (ledger.store.tx_begin_read (), account);
if (info)
{
tag.type = async_tag::query_type::blocks_by_hash;
tag.type = query_type::blocks_by_hash;
tag.start = info->head;
tag.hash = info->head;
}
else
{
tag.type = async_tag::query_type::blocks_by_account;
tag.type = query_type::blocks_by_account;
tag.start = account;
}
on_request.notify (tag, channel);
track (tag);
send (channel, tag);
return true; // Request sent
}
bool nano::bootstrap_ascending::service::run_one ()
bool nano::bootstrap_ascending::service::request_info (nano::block_hash hash, std::shared_ptr<nano::transport::channel> const & channel, query_source source)
{
// Ensure there is enough space in blockprocessor for queuing new blocks
async_tag tag{};
tag.type = query_type::account_info_by_hash;
tag.source = source;
tag.start = hash;
tag.hash = hash;
on_request.notify (tag, channel);
send (channel, tag);
return true; // Request sent
}
void nano::bootstrap_ascending::service::run_one_priority ()
{
wait_tags ();
wait_blockprocessor ();
// Waits for account either from priority queue or database
auto account = wait_available_account ();
if (account.is_zero ())
{
return false;
}
// Waits for channel that is not full
auto channel = wait_available_channel ();
auto channel = wait_channel ();
if (!channel)
{
return false;
return;
}
bool success = request (account, channel);
return success;
}
void nano::bootstrap_ascending::service::throttle_if_needed (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
if (!iterator.warmup () && throttle.throttled ())
auto [account, priority] = wait_priority ();
if (account.is_zero ())
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::throttled);
condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; });
return;
}
size_t const min_pull_count = 2;
auto count = std::clamp (static_cast<size_t> (priority), min_pull_count, nano::bootstrap_server::max_blocks);
request (account, count, channel, query_source::priority);
}
void nano::bootstrap_ascending::service::run ()
void nano::bootstrap_ascending::service::run_priorities ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
lock.unlock ();
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop);
run_one ();
run_one_priority ();
lock.lock ();
throttle_if_needed (lock);
}
}
void nano::bootstrap_ascending::service::run_one_database (bool should_throttle)
{
wait_tags ();
wait_blockprocessor ();
auto channel = wait_channel ();
if (!channel)
{
return;
}
auto account = wait_database (should_throttle);
if (account.is_zero ())
{
return;
}
request (account, 2, channel, query_source::database);
}
void nano::bootstrap_ascending::service::run_database ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
// Avoid high churn rate of database requests
bool should_throttle = !iterator.warmup () && throttle.throttled ();
lock.unlock ();
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_database);
run_one_database (should_throttle);
lock.lock ();
}
}
void nano::bootstrap_ascending::service::run_one_blocking ()
{
wait_tags ();
wait_blockprocessor ();
auto channel = wait_channel ();
if (!channel)
{
return;
}
auto blocking = wait_blocking ();
if (blocking.is_zero ())
{
return;
}
request_info (blocking, channel, query_source::blocking);
}
void nano::bootstrap_ascending::service::run_dependencies ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
lock.unlock ();
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_dependencies);
run_one_blocking ();
lock.lock ();
}
}
void nano::bootstrap_ascending::service::cleanup_and_sync ()
{
debug_assert (!mutex.try_lock ());
scoring.sync (network.list ());
scoring.timeout ();
throttle.resize (compute_throttle_size ());
auto const cutoff = std::chrono::steady_clock::now () - config.request_timeout;
auto should_timeout = [cutoff] (async_tag const & tag) {
return tag.timestamp < cutoff;
};
auto & tags_by_order = tags.get<tag_sequenced> ();
while (!tags_by_order.empty () && should_timeout (tags_by_order.front ()))
{
auto tag = tags_by_order.front ();
tags_by_order.pop_front ();
on_timeout.notify (tag);
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout);
}
if (sync_dependencies_interval.elapsed (60s))
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::sync_dependencies);
accounts.sync_dependencies ();
}
}
@ -319,110 +574,177 @@ void nano::bootstrap_ascending::service::run_timeouts ()
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
scoring.sync (network.list ());
scoring.timeout ();
throttle.resize (compute_throttle_size ());
auto const cutoff = std::chrono::steady_clock::now () - config.bootstrap_ascending.request_timeout;
auto should_timeout = [cutoff] (async_tag const & tag) {
return tag.timestamp < cutoff;
};
auto & tags_by_order = tags.get<tag_sequenced> ();
while (!tags_by_order.empty () && should_timeout (tags_by_order.front ()))
{
auto tag = tags_by_order.front ();
tags_by_order.pop_front ();
on_timeout.notify (tag);
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timeout);
}
condition.wait_for (lock, 1s, [this] () { return stopped; });
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_cleanup);
cleanup_and_sync ();
condition.wait_for (lock, 5s, [this] () { return stopped; });
}
}
void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & message, std::shared_ptr<nano::transport::channel> channel)
void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & message, std::shared_ptr<nano::transport::channel> const & channel)
{
nano::unique_lock<nano::mutex> lock{ mutex };
// Only process messages that have a known tag
auto & tags_by_id = tags.get<tag_id> ();
if (tags_by_id.count (message.id) > 0)
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply);
auto iterator = tags_by_id.find (message.id);
auto tag = *iterator;
tags_by_id.erase (iterator);
// Track bootstrap request response time
stats.sample (nano::stat::sample::bootstrap_tag_duration, nano::log::milliseconds_delta (tag.timestamp), { 0, config.bootstrap_ascending.request_timeout.count () });
scoring.received_message (channel);
lock.unlock ();
on_reply.notify (tag);
condition.notify_all ();
std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload);
}
else
auto it = tags.get<tag_id> ().find (message.id);
if (it == tags.get<tag_id> ().end ())
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::missing_tag);
return;
}
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply);
auto tag = *it;
tags.get<tag_id> ().erase (it); // Iterator is invalid after this point
// Verifies that response type corresponds to our query
struct payload_verifier
{
query_type type;
bool operator() (const nano::asc_pull_ack::blocks_payload & response) const
{
return type == query_type::blocks_by_hash || type == query_type::blocks_by_account;
}
bool operator() (const nano::asc_pull_ack::account_info_payload & response) const
{
return type == query_type::account_info_by_hash;
}
bool operator() (const nano::asc_pull_ack::frontiers_payload & response) const
{
return false; // TODO: Handle frontiers
}
bool operator() (const nano::empty_payload & response) const
{
return false; // Should not happen
}
};
bool valid = std::visit (payload_verifier{ tag.type }, message.payload);
if (!valid)
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid_response_type);
return;
}
// Track bootstrap request response time
stats.inc (nano::stat::type::bootstrap_ascending_reply, to_stat_detail (tag.type));
stats.sample (nano::stat::sample::bootstrap_tag_duration, nano::log::milliseconds_delta (tag.timestamp), { 0, config.request_timeout.count () });
scoring.received_message (channel);
lock.unlock ();
on_reply.notify (tag);
// Process the response payload
std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload);
condition.notify_all ();
}
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const async_tag & tag)
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::process);
debug_assert (tag.type == query_type::blocks_by_hash || tag.type == query_type::blocks_by_account);
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::blocks);
auto result = verify (response, tag);
switch (result)
{
case verify_result::ok:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::ok);
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ());
for (auto & block : response.blocks)
auto blocks = response.blocks;
// Avoid re-processing the block we already have
release_assert (blocks.size () >= 1);
if (blocks.front ()->hash () == tag.start.as_block_hash ())
{
block_processor.add (block, nano::block_source::bootstrap);
blocks.pop_front ();
}
for (auto const & block : blocks)
{
if (block == blocks.back ())
{
// It's the last block submitted for this account chanin, reset timestamp to allow more requests
block_processor.add (block, nano::block_source::bootstrap, nullptr, [this, account = tag.account] (auto result) {
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::timestamp_reset);
{
nano::lock_guard<nano::mutex> guard{ mutex };
accounts.timestamp_reset (account);
}
condition.notify_all ();
});
}
else
{
block_processor.add (block, nano::block_source::bootstrap);
}
}
if (tag.source == query_source::database)
{
nano::lock_guard<nano::mutex> lock{ mutex };
throttle.add (true);
}
nano::lock_guard<nano::mutex> lock{ mutex };
throttle.add (true);
}
break;
case verify_result::nothing_new:
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::nothing_new);
stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::nothing_new);
nano::lock_guard<nano::mutex> lock{ mutex };
accounts.priority_down (tag.account);
throttle.add (false);
if (tag.source == query_source::database)
{
throttle.add (false);
}
}
break;
case verify_result::invalid:
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::invalid);
// TODO: Log
stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::invalid);
}
break;
}
}
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::account_info_payload & response, const async_tag & tag)
{
// TODO: Make use of account info
debug_assert (tag.type == query_type::account_info_by_hash);
debug_assert (!tag.hash.is_zero ());
if (response.account.is_zero ())
{
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info_empty);
}
else
{
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info);
// Prioritize account containing the dependency
{
nano::lock_guard<nano::mutex> lock{ mutex };
accounts.dependency_update (tag.hash, response.account);
accounts.priority_set (response.account);
}
}
}
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::frontiers_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::frontiers_payload & response, const async_tag & tag)
{
// TODO: Make use of frontiers info
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::frontiers);
}
void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const async_tag & tag)
{
// Should not happen
debug_assert (false, "empty payload");
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::empty);
debug_assert (false, "empty payload"); // Should not happen
}
nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::service::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) const
@ -437,11 +759,15 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser
{
return verify_result::nothing_new;
}
if (blocks.size () > tag.count)
{
return verify_result::invalid;
}
auto const & first = blocks.front ();
switch (tag.type)
{
case async_tag::query_type::blocks_by_hash:
case query_type::blocks_by_hash:
{
if (first->hash () != tag.start.as_block_hash ())
{
@ -450,7 +776,7 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser
}
}
break;
case async_tag::query_type::blocks_by_account:
case query_type::blocks_by_account:
{
// Open & state blocks always contain account field
if (first->account_field () != tag.start.as_account ())
@ -480,15 +806,6 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser
return verify_result::ok;
}
void nano::bootstrap_ascending::service::track (async_tag const & tag)
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::track);
nano::lock_guard<nano::mutex> lock{ mutex };
debug_assert (tags.get<tag_id> ().count (tag.id) == 0);
tags.get<tag_id> ().insert (tag);
}
auto nano::bootstrap_ascending::service::info () const -> nano::bootstrap_ascending::account_sets::info_t
{
nano::lock_guard<nano::mutex> lock{ mutex };
@ -497,10 +814,10 @@ auto nano::bootstrap_ascending::service::info () const -> nano::bootstrap_ascend
std::size_t nano::bootstrap_ascending::service::compute_throttle_size () const
{
// Scales logarithmically with ledger block
// Returns: config.throttle_coefficient * sqrt(block_count)
std::size_t size_new = config.bootstrap_ascending.throttle_coefficient * std::sqrt (ledger.block_count ());
return size_new == 0 ? 16 : size_new;
auto ledger_size = ledger.account_count ();
size_t target = ledger_size > 0 ? config.throttle_coefficient * static_cast<size_t> (std::log (ledger_size)) : 0;
size_t min_size = 16;
return std::max (target, min_size);
}
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::service::collect_container_info (std::string const & name)
@ -514,3 +831,12 @@ std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::servi
composite->add_component (accounts.collect_container_info ("accounts"));
return composite;
}
/*
*
*/
nano::stat::detail nano::bootstrap_ascending::to_stat_detail (nano::bootstrap_ascending::service::query_type type)
{
return nano::enum_util::cast<nano::stat::detail> (type);
}

View file

@ -1,5 +1,6 @@
#pragma once
#include <nano/lib/interval.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/observer_set.hpp>
@ -43,7 +44,7 @@ namespace bootstrap_ascending
class service
{
public:
service (nano::node_config &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &);
service (nano::node_config const &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &, nano::logger &);
~service ();
void start ();
@ -52,43 +53,56 @@ namespace bootstrap_ascending
/**
* Process `asc_pull_ack` message coming from network
*/
void process (nano::asc_pull_ack const & message, std::shared_ptr<nano::transport::channel> channel);
void process (nano::asc_pull_ack const & message, std::shared_ptr<nano::transport::channel> const &);
public: // Container info
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
std::size_t blocked_size () const;
std::size_t priority_size () const;
std::size_t score_size () const;
nano::bootstrap_ascending::account_sets::info_t info () const;
private: // Dependencies
nano::node_config & config;
nano::network_constants & network_consts;
bootstrap_ascending_config const & config;
nano::network_constants const & network_constants;
nano::block_processor & block_processor;
nano::ledger & ledger;
nano::network & network;
nano::stats & stats;
nano::logger & logger;
public: // Tag
enum class query_type
{
invalid = 0, // Default initialization
blocks_by_hash,
blocks_by_account,
account_info_by_hash,
};
enum class query_source
{
invalid,
priority,
database,
blocking,
};
public: // async_tag
struct async_tag
{
enum class query_type
{
invalid = 0, // Default initialization
blocks_by_hash,
blocks_by_account,
// TODO: account_info,
};
query_type type{ query_type::invalid };
nano::bootstrap_ascending::id_t id{ 0 };
query_source source{ query_source::invalid };
nano::hash_or_account start{ 0 };
nano::account account{ 0 };
nano::block_hash hash{ 0 };
size_t count{ 0 };
id_t id{ generate_id () };
std::chrono::steady_clock::time_point timestamp{ std::chrono::steady_clock::now () };
};
public: // Events
nano::observer_set<async_tag const &, std::shared_ptr<nano::transport::channel> &> on_request;
nano::observer_set<async_tag const &, std::shared_ptr<nano::transport::channel> const &> on_request;
nano::observer_set<async_tag const &> on_reply;
nano::observer_set<async_tag const &> on_timeout;
@ -96,22 +110,37 @@ namespace bootstrap_ascending
/* Inspects a block that has been processed by the block processor */
void inspect (secure::transaction const &, nano::block_status const & result, nano::block const & block);
void throttle_if_needed (nano::unique_lock<nano::mutex> & lock);
void run ();
bool run_one ();
void run_priorities ();
void run_one_priority ();
void run_database ();
void run_one_database (bool should_throttle);
void run_dependencies ();
void run_one_blocking ();
void run_timeouts ();
void cleanup_and_sync ();
/* Throttles requesting new blocks, not to overwhelm blockprocessor */
/* Waits for a condition to be satisfied with incremental backoff */
void wait (std::function<bool ()> const & predicate) const;
/* Avoid too many in-flight requests */
void wait_tags ();
/* Ensure there is enough space in blockprocessor for queuing new blocks */
void wait_blockprocessor ();
/* Waits for channel with free capacity for bootstrap messages */
std::shared_ptr<nano::transport::channel> wait_available_channel ();
/* Waits for a channel that is not full */
std::shared_ptr<nano::transport::channel> wait_channel ();
/* Waits until a suitable account outside of cool down period is available */
nano::account available_account ();
nano::account wait_available_account ();
std::pair<nano::account, double> next_priority ();
std::pair<nano::account, double> wait_priority ();
/* Gets the next account from the database */
nano::account next_database (bool should_throttle);
nano::account wait_database (bool should_throttle);
/* Waits for next available blocking block */
nano::block_hash next_blocking ();
nano::block_hash wait_blocking ();
bool request (nano::account &, std::shared_ptr<nano::transport::channel> &);
void send (std::shared_ptr<nano::transport::channel>, async_tag tag);
void track (async_tag const & tag);
bool request (nano::account, size_t count, std::shared_ptr<nano::transport::channel> const &, query_source);
bool request_info (nano::block_hash, std::shared_ptr<nano::transport::channel> const &, query_source);
void send (std::shared_ptr<nano::transport::channel> const &, async_tag tag);
void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag);
void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag);
@ -133,20 +162,23 @@ namespace bootstrap_ascending
*/
verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const;
public: // account_sets
nano::bootstrap_ascending::account_sets::info_t info () const;
size_t count_tags (nano::account const & account, query_source source) const;
size_t count_tags (nano::block_hash const & hash, query_source source) const;
// Calculates a lookback size based on the size of the ledger where larger ledgers have a larger sample count
std::size_t compute_throttle_size () const;
private:
nano::bootstrap_ascending::account_sets accounts;
nano::bootstrap_ascending::buffered_iterator iterator;
nano::bootstrap_ascending::throttle throttle;
// Calculates a lookback size based on the size of the ledger where larger ledgers have a larger sample count
std::size_t compute_throttle_size () const;
nano::bootstrap_ascending::peer_scoring scoring;
// clang-format off
class tag_sequenced {};
class tag_id {};
class tag_account {};
class tag_hash {};
using ordered_tags = boost::multi_index_container<async_tag,
mi::indexed_by<
@ -154,21 +186,28 @@ namespace bootstrap_ascending
mi::hashed_unique<mi::tag<tag_id>,
mi::member<async_tag, nano::bootstrap_ascending::id_t, &async_tag::id>>,
mi::hashed_non_unique<mi::tag<tag_account>,
mi::member<async_tag, nano::account , &async_tag::account>>
mi::member<async_tag, nano::account , &async_tag::account>>,
mi::hashed_non_unique<mi::tag<tag_hash>,
mi::member<async_tag, nano::block_hash, &async_tag::hash>>
>>;
// clang-format on
ordered_tags tags;
nano::bootstrap_ascending::peer_scoring scoring;
// Requests for accounts from database have much lower hitrate and could introduce strain on the network
// A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue
nano::bandwidth_limiter database_limiter;
nano::interval sync_dependencies_interval;
bool stopped{ false };
mutable nano::mutex mutex;
mutable nano::condition_variable condition;
std::thread thread;
std::thread priorities_thread;
std::thread database_thread;
std::thread dependencies_thread;
std::thread timeout_thread;
};
nano::stat::detail to_stat_detail (service::query_type);
}
}

View file

@ -5279,7 +5279,7 @@ void nano::json_handler::debug_bootstrap_priority_info ()
boost::property_tree::ptree response_blocking;
for (auto const & entry : blocking)
{
const auto account = entry.account;
const auto account = entry.account ();
const auto dependency = entry.dependency;
response_blocking.put (account.to_account (), dependency.to_string ());

View file

@ -681,7 +681,7 @@ public: // Payload definitions
void deserialize (nano::stream &);
public: // Payload
std::vector<std::shared_ptr<nano::block>> blocks;
std::deque<std::shared_ptr<nano::block>> blocks;
public: // Logging
void operator() (nano::object_stream &) const;

View file

@ -217,7 +217,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
aggregator{ *aggregator_impl },
wallets (wallets_store.init_error (), *this),
backlog{ nano::backlog_population_config (config), scheduler, ledger, stats },
ascendboot_impl{ std::make_unique<nano::bootstrap_ascending::service> (config, block_processor, ledger, network, stats) },
ascendboot_impl{ std::make_unique<nano::bootstrap_ascending::service> (config, block_processor, ledger, network, stats, logger) },
ascendboot{ *ascendboot_impl },
websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger },
epoch_upgrader{ *this, ledger, store, network_params, logger },