Merge pull request #4784 from pwojcikdev/bootstrap-tuning-3

Bootstrap tuning
This commit is contained in:
Piotr Wójcik 2024-11-23 00:35:16 +01:00 committed by GitHub
commit 22abd717a4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 221 additions and 169 deletions

View file

@ -27,11 +27,13 @@ nano::block_hash random_hash ()
}
}
/*
* account_sets
*/
TEST (account_sets, construction)
{
nano::test::system system;
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
}
@ -41,8 +43,6 @@ TEST (account_sets, empty_blocked)
nano::test::system system;
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
ASSERT_FALSE (sets.blocked (account));
@ -53,10 +53,9 @@ TEST (account_sets, block)
nano::test::system system;
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
sets.priority_up (account);
sets.block (account, random_hash ());
ASSERT_TRUE (sets.blocked (account));
}
@ -66,12 +65,12 @@ TEST (account_sets, unblock)
nano::test::system system;
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
auto hash = random_hash ();
sets.priority_up (account);
sets.block (account, hash);
ASSERT_TRUE (sets.blocked (account));
sets.unblock (account, hash);
ASSERT_FALSE (sets.blocked (account));
}
@ -81,8 +80,6 @@ TEST (account_sets, priority_base)
nano::test::system system;
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
ASSERT_EQ (0.0, sets.priority (account));
@ -93,32 +90,26 @@ TEST (account_sets, priority_blocked)
nano::test::system system;
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
sets.block (account, random_hash ());
ASSERT_EQ (0.0, sets.priority (account));
}
// When account is unblocked, check that it retains it former priority
TEST (account_sets, priority_unblock_keep)
TEST (account_sets, priority_unblock)
{
nano::test::system system;
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
sets.priority_up (account);
sets.priority_up (account);
ASSERT_EQ (sets.priority (account), nano::bootstrap::account_sets::priority_initial + nano::bootstrap::account_sets::priority_increase);
ASSERT_EQ (sets.priority (account), nano::bootstrap::account_sets::priority_initial);
auto hash = random_hash ();
sets.block (account, hash);
ASSERT_EQ (0.0, sets.priority (account));
sets.unblock (account, hash);
ASSERT_EQ (sets.priority (account), nano::bootstrap::account_sets::priority_initial + nano::bootstrap::account_sets::priority_increase);
ASSERT_EQ (sets.priority (account), nano::bootstrap::account_sets::priority_initial);
}
TEST (account_sets, priority_up_down)
@ -126,37 +117,58 @@ TEST (account_sets, priority_up_down)
nano::test::system system;
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
sets.priority_up (account);
ASSERT_EQ (sets.priority (account), nano::bootstrap::account_sets::priority_initial);
sets.priority_down (account);
ASSERT_EQ (sets.priority (account), nano::bootstrap::account_sets::priority_initial / nano::bootstrap::account_sets::priority_divide);
ASSERT_EQ (sets.priority (account), nano::bootstrap::account_sets::priority_initial);
}
TEST (account_sets, priority_down_sat)
TEST (account_sets, priority_down_empty)
{
nano::test::system system;
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
sets.priority_down (account);
ASSERT_EQ (0.0, sets.priority (account));
}
TEST (account_sets, priority_down_saturate)
{
nano::test::system system;
nano::account account{ 1 };
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
sets.priority_up (account);
ASSERT_EQ (sets.priority (account), nano::bootstrap::account_sets::priority_initial);
for (int n = 0; n < 1000; ++n)
{
sets.priority_down (account);
}
ASSERT_FALSE (sets.prioritized (account));
}
TEST (account_sets, priority_set)
{
nano::test::system system;
nano::account account{ 1 };
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
sets.priority_set (account, 10.0);
ASSERT_EQ (sets.priority (account), 10.0);
}
// Ensure priority value is bounded
TEST (account_sets, saturate_priority)
{
nano::test::system system;
nano::account account{ 1 };
auto store = nano::make_store (system.logger, nano::unique_path (), nano::dev::constants);
ASSERT_FALSE (store->init_error ());
nano::account_sets_config config;
nano::bootstrap::account_sets sets{ config, system.stats };
for (int n = 0; n < 1000; ++n)
@ -166,6 +178,10 @@ TEST (account_sets, saturate_priority)
ASSERT_EQ (sets.priority (account), nano::bootstrap::account_sets::priority_max);
}
/*
* bootstrap
*/
/**
* Tests the base case for returning
*/

View file

@ -70,6 +70,7 @@ enum class type
bootstrap_frontiers,
bootstrap_account_sets,
bootstrap_frontier_scan,
bootstrap_timeout,
bootstrap_server,
bootstrap_server_request,
bootstrap_server_overfill,
@ -145,6 +146,7 @@ enum class detail
retry,
prioritized,
pending,
sync,
// processing queue
queue,
@ -456,13 +458,16 @@ enum class detail
loop_frontiers_processing,
duplicate_request,
invalid_response_type,
invalid_response,
timestamp_reset,
processing_frontiers,
frontiers_dropped,
sync_accounts,
prioritize,
prioritize_failed,
block,
block_failed,
unblock,
unblock_failed,
dependency_update,
@ -483,14 +488,17 @@ enum class detail
next_frontier,
blocking_insert,
blocking_erase_overflow,
blocking_overflow,
priority_insert,
priority_erase_by_threshold,
priority_erase_by_blocking,
priority_erase_overflow,
priority_set,
priority_unblocked,
erase_by_threshold,
erase_by_blocking,
priority_overflow,
deprioritize,
deprioritize_failed,
sync_dependencies,
dependency_synced,
request_blocks,
request_account_info,

View file

@ -106,7 +106,7 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::bootstrap_database_scan:
thread_role_name_string = "Bootstrap db";
break;
case nano::thread_role::name::bootstrap_dependendy_walker:
case nano::thread_role::name::bootstrap_dependency_walker:
thread_role_name_string = "Bootstrap walkr";
break;
case nano::thread_role::name::bootstrap_frontier_scan:

View file

@ -41,7 +41,7 @@ enum class name
telemetry,
bootstrap,
bootstrap_database_scan,
bootstrap_dependendy_walker,
bootstrap_dependency_walker,
bootstrap_frontier_scan,
bootstrap_cleanup,
bootstrap_worker,

View file

@ -30,11 +30,11 @@ void nano::bootstrap::account_sets::priority_up (nano::account const & account)
{
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::prioritize);
auto iter = priorities.get<tag_account> ().find (account);
if (iter != priorities.get<tag_account> ().end ())
if (auto it = priorities.get<tag_account> ().find (account); it != priorities.get<tag_account> ().end ())
{
priorities.get<tag_account> ().modify (iter, [] (auto & val) {
priorities.get<tag_account> ().modify (it, [] (auto & val) {
val.priority = std::min ((val.priority + account_sets::priority_increase), account_sets::priority_max);
val.fails = 0;
});
}
else
@ -57,21 +57,19 @@ void nano::bootstrap::account_sets::priority_down (nano::account const & account
return;
}
auto iter = priorities.get<tag_account> ().find (account);
if (iter != priorities.get<tag_account> ().end ())
if (auto it = priorities.get<tag_account> ().find (account); it != priorities.get<tag_account> ().end ())
{
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::deprioritize);
auto priority_new = iter->priority / account_sets::priority_divide;
if (priority_new <= account_sets::priority_cutoff)
if (it->fails >= account_sets::max_fails || it->fails >= it->priority)
{
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::priority_erase_by_threshold);
priorities.get<tag_account> ().erase (iter);
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::erase_by_threshold);
priorities.get<tag_account> ().erase (it);
}
else
{
priorities.get<tag_account> ().modify (iter, [priority_new] (auto & val) {
val.priority = priority_new;
priorities.get<tag_account> ().modify (it, [] (auto & val) {
val.fails += 1;
});
}
}
@ -81,7 +79,7 @@ void nano::bootstrap::account_sets::priority_down (nano::account const & account
}
}
void nano::bootstrap::account_sets::priority_set (nano::account const & account)
void nano::bootstrap::account_sets::priority_set (nano::account const & account, double priority)
{
if (account.is_zero ())
{
@ -90,11 +88,10 @@ void nano::bootstrap::account_sets::priority_set (nano::account const & account)
if (!blocked (account))
{
auto iter = priorities.get<tag_account> ().find (account);
if (iter == priorities.get<tag_account> ().end ())
if (!priorities.get<tag_account> ().contains (account))
{
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::priority_insert);
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::priority_set);
priorities.get<tag_account> ().insert ({ account, priority });
trim_overflow ();
}
}
@ -108,19 +105,21 @@ void nano::bootstrap::account_sets::block (nano::account const & account, nano::
{
debug_assert (!account.is_zero ());
auto erased = priorities.get<tag_account> ().erase (account);
if (erased > 0)
{
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::erase_by_blocking);
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::block);
auto existing = priorities.get<tag_account> ().find (account);
auto entry = (existing == priorities.get<tag_account> ().end ()) ? priority_entry{ account, 0 } : *existing;
priorities.get<tag_account> ().erase (account);
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::priority_erase_by_blocking);
blocking.get<tag_account> ().insert ({ entry, dependency });
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::blocking_insert);
debug_assert (blocking.get<tag_account> ().count (account) == 0);
blocking.get<tag_account> ().insert ({ account, dependency });
trim_overflow ();
}
else
{
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::block_failed);
}
}
void nano::bootstrap::account_sets::unblock (nano::account const & account, std::optional<nano::block_hash> const & hash)
{
@ -134,19 +133,11 @@ void nano::bootstrap::account_sets::unblock (nano::account const & account, std:
if (existing != blocking.get<tag_account> ().end () && (!hash || existing->dependency == *hash))
{
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::unblock);
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::priority_unblocked);
debug_assert (priorities.get<tag_account> ().count (account) == 0);
if (!existing->original_entry.account.is_zero ())
{
debug_assert (existing->original_entry.account == account);
priorities.get<tag_account> ().insert (existing->original_entry);
}
else
{
priorities.get<tag_account> ().insert ({ account, account_sets::priority_initial });
}
blocking.get<tag_account> ().erase (account);
trim_overflow ();
}
else
@ -212,17 +203,17 @@ void nano::bootstrap::account_sets::dependency_update (nano::block_hash const &
void nano::bootstrap::account_sets::trim_overflow ()
{
while (priorities.size () > config.priorities_max)
while (!priorities.empty () && priorities.size () > config.priorities_max)
{
// Erase the oldest entry
priorities.pop_front ();
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::priority_erase_overflow);
// Erase the lowest priority entry
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::priority_overflow);
priorities.get<tag_priority> ().erase (std::prev (priorities.get<tag_priority> ().end ()));
}
while (blocking.size () > config.blocking_max)
while (!blocking.empty () && blocking.size () > config.blocking_max)
{
// Erase the oldest entry
// Erase the lowest priority entry
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::blocking_overflow);
blocking.pop_front ();
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::blocking_erase_overflow);
}
}
@ -275,6 +266,8 @@ nano::block_hash nano::bootstrap::account_sets::next_blocking (std::function<boo
void nano::bootstrap::account_sets::sync_dependencies ()
{
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::sync_dependencies);
// Sample all accounts with a known dependency account (> account 0)
auto begin = blocking.get<tag_dependency_account> ().upper_bound (nano::account{ 0 });
auto end = blocking.get<tag_dependency_account> ().end ();
@ -290,7 +283,7 @@ void nano::bootstrap::account_sets::sync_dependencies ()
if (!blocked (entry.dependency_account) && !prioritized (entry.dependency_account))
{
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::sync_dependencies);
stats.inc (nano::stat::type::bootstrap_account_sets, nano::stat::detail::dependency_synced);
priority_set (entry.dependency_account);
}
}
@ -332,8 +325,7 @@ double nano::bootstrap::account_sets::priority (nano::account const & account) c
{
if (!blocked (account))
{
auto existing = priorities.get<tag_account> ().find (account);
if (existing != priorities.get<tag_account> ().end ())
if (auto existing = priorities.get<tag_account> ().find (account); existing != priorities.get<tag_account> ().end ())
{
return existing->priority;
}

View file

@ -24,6 +24,14 @@ namespace bootstrap
/** This class tracks accounts various account sets which are shared among the multiple bootstrap threads */
class account_sets
{
public: // Constants
static double constexpr priority_initial = 2.0;
static double constexpr priority_increase = 2.0;
static double constexpr priority_divide = 2.0;
static double constexpr priority_max = 128.0;
static double constexpr priority_cutoff = 0.15;
static unsigned constexpr max_fails = 3;
public:
account_sets (account_sets_config const &, nano::stats &);
@ -38,7 +46,7 @@ namespace bootstrap
* Current implementation divides priority by 2.0f and saturates down to 1.0f.
*/
void priority_down (nano::account const & account);
void priority_set (nano::account const & account);
void priority_set (nano::account const & account, double priority = priority_initial);
void block (nano::account const & account, nano::block_hash const & dependency);
void unblock (nano::account const & account, std::optional<nano::block_hash> const & hash = std::nullopt);
@ -86,27 +94,17 @@ namespace bootstrap
{
nano::account account;
double priority;
id_t id{ generate_id () }; // Uniformly distributed, used for random querying
unsigned fails{ 0 };
std::chrono::steady_clock::time_point timestamp{};
id_t id{ generate_id () }; // Uniformly distributed, used for random querying
};
struct blocking_entry
{
priority_entry original_entry;
nano::account account;
nano::block_hash dependency;
nano::account dependency_account{ 0 };
id_t id{ generate_id () }; // Uniformly distributed, used for random querying
nano::account account () const
{
return original_entry.account;
}
double priority () const
{
return original_entry.priority;
}
};
// clang-format off
@ -135,7 +133,7 @@ namespace bootstrap
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::ordered_unique<mi::tag<tag_account>,
mi::const_mem_fun<blocking_entry, nano::account, &blocking_entry::account>>,
mi::member<blocking_entry, nano::account, &blocking_entry::account>>,
mi::ordered_non_unique<mi::tag<tag_dependency>,
mi::member<blocking_entry, nano::block_hash, &blocking_entry::dependency>>,
mi::ordered_non_unique<mi::tag<tag_dependency_account>,
@ -148,13 +146,6 @@ namespace bootstrap
ordered_priorities priorities;
ordered_blocking blocking;
public: // Constants
static double constexpr priority_initial = 2.0;
static double constexpr priority_increase = 2.0;
static double constexpr priority_divide = 2.0;
static double constexpr priority_max = 128.0;
static double constexpr priority_cutoff = 0.15;
public:
using info_t = std::tuple<decltype (blocking), decltype (priorities)>; // <blocking, priorities>
info_t info () const;

View file

@ -53,7 +53,7 @@ public:
std::size_t frontier_rate_limit{ 8 };
std::size_t database_warmup_ratio{ 10 };
std::size_t max_pull_count{ nano::bootstrap_server::max_blocks };
std::chrono::milliseconds request_timeout{ 1000 * 5 };
std::chrono::milliseconds request_timeout{ 1000 * 15 };
std::size_t throttle_coefficient{ 8 * 1024 };
std::chrono::milliseconds throttle_wait{ 100 };
std::size_t block_processor_threshold{ 1000 };

View file

@ -60,7 +60,7 @@ nano::bootstrap_service::~bootstrap_service ()
debug_assert (!database_thread.joinable ());
debug_assert (!dependencies_thread.joinable ());
debug_assert (!frontiers_thread.joinable ());
debug_assert (!timeout_thread.joinable ());
debug_assert (!cleanup_thread.joinable ());
debug_assert (!workers.alive ());
}
@ -70,7 +70,7 @@ void nano::bootstrap_service::start ()
debug_assert (!database_thread.joinable ());
debug_assert (!dependencies_thread.joinable ());
debug_assert (!frontiers_thread.joinable ());
debug_assert (!timeout_thread.joinable ());
debug_assert (!cleanup_thread.joinable ());
if (!config.enable)
{
@ -99,7 +99,7 @@ void nano::bootstrap_service::start ()
if (config.enable_dependency_walker)
{
dependencies_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::bootstrap_dependendy_walker);
nano::thread_role::set (nano::thread_role::name::bootstrap_dependency_walker);
run_dependencies ();
});
}
@ -112,7 +112,7 @@ void nano::bootstrap_service::start ()
});
}
timeout_thread = std::thread ([this] () {
cleanup_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::bootstrap_cleanup);
run_timeouts ();
});
@ -130,7 +130,7 @@ void nano::bootstrap_service::stop ()
nano::join_or_pass (database_thread);
nano::join_or_pass (dependencies_thread);
nano::join_or_pass (frontiers_thread);
nano::join_or_pass (timeout_thread);
nano::join_or_pass (cleanup_thread);
workers.stop ();
}
@ -418,7 +418,7 @@ nano::block_hash nano::bootstrap_service::next_blocking ()
debug_assert (!mutex.try_lock ());
auto blocking = accounts.next_blocking ([this] (nano::block_hash const & hash) {
return count_tags (hash, query_source::blocking) == 0;
return count_tags (hash, query_source::dependencies) == 0;
});
if (blocking.is_zero ())
{
@ -590,7 +590,7 @@ void nano::bootstrap_service::run_database ()
}
}
void nano::bootstrap_service::run_one_blocking ()
void nano::bootstrap_service::run_one_dependency ()
{
// No need to wait for blockprocessor, as we are not processing blocks
auto channel = wait_channel ();
@ -603,7 +603,7 @@ void nano::bootstrap_service::run_one_blocking ()
{
return;
}
request_info (blocking, channel, query_source::blocking);
request_info (blocking, channel, query_source::dependencies);
}
void nano::bootstrap_service::run_dependencies ()
@ -613,7 +613,7 @@ void nano::bootstrap_service::run_dependencies ()
{
lock.unlock ();
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::loop_dependencies);
run_one_blocking ();
run_one_dependency ();
lock.lock ();
}
}
@ -659,7 +659,7 @@ void nano::bootstrap_service::cleanup_and_sync ()
{
debug_assert (!mutex.try_lock ());
scoring.sync (network.list ());
scoring.sync (network.list (/* all */ 0, network_constants.bootstrap_protocol_version_min));
scoring.timeout ();
throttle.resize (compute_throttle_size ());
@ -673,8 +673,9 @@ void nano::bootstrap_service::cleanup_and_sync ()
while (!tags_by_order.empty () && should_timeout (tags_by_order.front ()))
{
auto tag = tags_by_order.front ();
tags_by_order.pop_front ();
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::timeout);
stats.inc (nano::stat::type::bootstrap_timeout, to_stat_detail (tag.type));
tags_by_order.pop_front ();
}
if (sync_dependencies_interval.elapsed (60s))
@ -746,17 +747,25 @@ void nano::bootstrap_service::process (nano::asc_pull_ack const & message, std::
stats.inc (nano::stat::type::bootstrap_reply, to_stat_detail (tag.type));
stats.sample (nano::stat::sample::bootstrap_tag_duration, nano::log::milliseconds_delta (tag.timestamp), { 0, config.request_timeout.count () });
scoring.received_message (channel);
lock.unlock ();
// Process the response payload
std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload);
bool ok = std::visit ([this, &tag] (auto && request) { return process (request, tag); }, message.payload);
if (ok)
{
lock.lock ();
scoring.received_message (channel);
lock.unlock ();
}
else
{
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::invalid_response);
}
condition.notify_all ();
}
void nano::bootstrap_service::process (const nano::asc_pull_ack::blocks_payload & response, const async_tag & tag)
bool nano::bootstrap_service::process (const nano::asc_pull_ack::blocks_payload & response, const async_tag & tag)
{
debug_assert (tag.type == query_type::blocks_by_hash || tag.type == query_type::blocks_by_account);
@ -824,9 +833,11 @@ void nano::bootstrap_service::process (const nano::asc_pull_ack::blocks_payload
}
break;
}
return result != verify_result::invalid;
}
void nano::bootstrap_service::process (const nano::asc_pull_ack::account_info_payload & response, const async_tag & tag)
bool nano::bootstrap_service::process (const nano::asc_pull_ack::account_info_payload & response, const async_tag & tag)
{
debug_assert (tag.type == query_type::account_info_by_hash);
debug_assert (!tag.hash.is_zero ());
@ -834,7 +845,7 @@ void nano::bootstrap_service::process (const nano::asc_pull_ack::account_info_pa
if (response.account.is_zero ())
{
stats.inc (nano::stat::type::bootstrap_process, nano::stat::detail::account_info_empty);
return;
return true; // OK, but nothing to do
}
stats.inc (nano::stat::type::bootstrap_process, nano::stat::detail::account_info);
@ -843,11 +854,13 @@ void nano::bootstrap_service::process (const nano::asc_pull_ack::account_info_pa
{
nano::lock_guard<nano::mutex> lock{ mutex };
accounts.dependency_update (tag.hash, response.account);
accounts.priority_set (response.account);
}
accounts.priority_set (response.account, nano::bootstrap::account_sets::priority_cutoff); // Use the lowest possible priority here
}
void nano::bootstrap_service::process (const nano::asc_pull_ack::frontiers_payload & response, const async_tag & tag)
return true; // OK, no way to verify the response
}
bool nano::bootstrap_service::process (const nano::asc_pull_ack::frontiers_payload & response, const async_tag & tag)
{
debug_assert (tag.type == query_type::frontiers);
debug_assert (!tag.start.is_zero ());
@ -855,7 +868,7 @@ void nano::bootstrap_service::process (const nano::asc_pull_ack::frontiers_paylo
if (response.frontiers.empty ())
{
stats.inc (nano::stat::type::bootstrap_process, nano::stat::detail::frontiers_empty);
return;
return true; // OK, but nothing to do
}
stats.inc (nano::stat::type::bootstrap_process, nano::stat::detail::frontiers);
@ -897,12 +910,15 @@ void nano::bootstrap_service::process (const nano::asc_pull_ack::frontiers_paylo
}
break;
}
return result != verify_result::invalid;
}
void nano::bootstrap_service::process (const nano::empty_payload & response, const async_tag & tag)
bool nano::bootstrap_service::process (const nano::empty_payload & response, const async_tag & tag)
{
stats.inc (nano::stat::type::bootstrap_process, nano::stat::detail::empty);
debug_assert (false, "empty payload"); // Should not happen
return false; // Invalid
}
void nano::bootstrap_service::process_frontiers (std::deque<std::pair<nano::account, nano::block_hash>> const & frontiers)
@ -981,7 +997,8 @@ void nano::bootstrap_service::process_frontiers (std::deque<std::pair<nano::acco
for (auto const & account : result)
{
accounts.priority_set (account);
// Use the lowest possible priority here
accounts.priority_set (account, nano::bootstrap::account_sets::priority_cutoff);
}
}
@ -1091,6 +1108,14 @@ nano::container_info nano::bootstrap_service::container_info () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto collect_limiters = [this] () {
nano::container_info info;
info.put ("total", limiter.size ());
info.put ("database", database_limiter.size ());
info.put ("frontiers", frontiers_limiter.size ());
return info;
};
nano::container_info info;
info.put ("tags", tags);
info.put ("throttle", throttle.size ());
@ -1099,6 +1124,8 @@ nano::container_info nano::bootstrap_service::container_info () const
info.add ("database_scan", database_scan.container_info ());
info.add ("frontiers", frontiers.container_info ());
info.add ("workers", workers.container_info ());
info.add ("peers", scoring.container_info ());
info.add ("limiters", collect_limiters ());
return info;
}

View file

@ -76,7 +76,7 @@ public: // Tag
invalid,
priority,
database,
blocking,
dependencies,
frontiers,
};
@ -104,9 +104,9 @@ private:
void run_database ();
void run_one_database (bool should_throttle);
void run_dependencies ();
void run_one_blocking ();
void run_one_frontier ();
void run_one_dependency ();
void run_frontiers ();
void run_one_frontier ();
void run_timeouts ();
void cleanup_and_sync ();
@ -134,10 +134,10 @@ private:
bool request_frontiers (nano::account, std::shared_ptr<nano::transport::channel> const &, query_source);
bool send (std::shared_ptr<nano::transport::channel> const &, async_tag tag);
void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag);
void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag);
void process (nano::asc_pull_ack::frontiers_payload const & response, async_tag const & tag);
void process (nano::empty_payload const & response, async_tag const & tag);
bool process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag);
bool process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag);
bool process (nano::asc_pull_ack::frontiers_payload const & response, async_tag const & tag);
bool process (nano::empty_payload const & response, async_tag const & tag);
void process_frontiers (std::deque<std::pair<nano::account, nano::block_hash>> const & frontiers);
@ -194,6 +194,7 @@ private:
// Requests for accounts from database have much lower hitrate and could introduce strain on the network
// A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue
nano::rate_limiter database_limiter;
// Rate limiter for frontier requests
nano::rate_limiter frontiers_limiter;
nano::interval sync_dependencies_interval;
@ -205,7 +206,7 @@ private:
std::thread database_thread;
std::thread dependencies_thread;
std::thread frontiers_thread;
std::thread timeout_thread;
std::thread cleanup_thread;
nano::thread_pool workers;
nano::random_generator_mt rng;

View file

@ -12,7 +12,17 @@ nano::bootstrap::peer_scoring::peer_scoring (bootstrap_config const & config_a,
{
}
bool nano::bootstrap::peer_scoring::try_send_message (std::shared_ptr<nano::transport::channel> channel)
bool nano::bootstrap::peer_scoring::limit_exceeded (std::shared_ptr<nano::transport::channel> const & channel) const
{
auto & index = scoring.get<tag_channel> ();
if (auto existing = index.find (channel.get ()); existing != index.end ())
{
return existing->outstanding >= config.channel_limit;
}
return false;
}
bool nano::bootstrap::peer_scoring::try_send_message (std::shared_ptr<nano::transport::channel> const & channel)
{
auto & index = scoring.get<tag_channel> ();
auto existing = index.find (channel.get ());
@ -38,11 +48,10 @@ bool nano::bootstrap::peer_scoring::try_send_message (std::shared_ptr<nano::tran
return false;
}
void nano::bootstrap::peer_scoring::received_message (std::shared_ptr<nano::transport::channel> channel)
void nano::bootstrap::peer_scoring::received_message (std::shared_ptr<nano::transport::channel> const & channel)
{
auto & index = scoring.get<tag_channel> ();
auto existing = index.find (channel.get ());
if (existing != index.end ())
if (auto existing = index.find (channel.get ()); existing != index.end ())
{
if (existing->outstanding > 1)
{
@ -57,12 +66,9 @@ void nano::bootstrap::peer_scoring::received_message (std::shared_ptr<nano::tran
std::shared_ptr<nano::transport::channel> nano::bootstrap::peer_scoring::channel ()
{
auto & index = scoring.get<tag_outstanding> ();
for (auto const & score : index)
for (auto const & channel : channels)
{
if (auto channel = score.shared ())
{
if (!channel->max ())
if (!channel->max (nano::transport::traffic_type::bootstrap))
{
if (!try_send_message (channel))
{
@ -70,7 +76,6 @@ std::shared_ptr<nano::transport::channel> nano::bootstrap::peer_scoring::channel
}
}
}
}
return nullptr;
}
@ -79,11 +84,16 @@ std::size_t nano::bootstrap::peer_scoring::size () const
return scoring.size ();
}
std::size_t nano::bootstrap::peer_scoring::available () const
{
return std::count_if (channels.begin (), channels.end (), [this] (auto const & channel) {
return !limit_exceeded (channel);
});
}
void nano::bootstrap::peer_scoring::timeout ()
{
auto & index = scoring.get<tag_channel> ();
erase_if (index, [] (auto const & score) {
erase_if (scoring, [] (auto const & score) {
if (auto channel = score.shared ())
{
if (channel->alive ())
@ -104,20 +114,16 @@ void nano::bootstrap::peer_scoring::timeout ()
void nano::bootstrap::peer_scoring::sync (std::deque<std::shared_ptr<nano::transport::channel>> const & list)
{
auto & index = scoring.get<tag_channel> ();
for (auto const & channel : list)
{
if (channel->get_network_version () >= network_constants.bootstrap_protocol_version_min)
{
if (index.find (channel.get ()) == index.end ())
{
if (!channel->max (nano::transport::traffic_type::bootstrap))
{
index.emplace (channel, 1, 1, 0);
}
}
}
channels = list;
}
nano::container_info nano::bootstrap::peer_scoring::container_info () const
{
nano::container_info info;
info.put ("scores", size ());
info.put ("available", available ());
info.put ("channels", channels.size ());
return info;
}
/*

View file

@ -23,14 +23,23 @@ namespace bootstrap
peer_scoring (bootstrap_config const &, nano::network_constants const &);
// Returns true if channel limit has been exceeded
bool try_send_message (std::shared_ptr<nano::transport::channel> channel);
void received_message (std::shared_ptr<nano::transport::channel> channel);
bool limit_exceeded (std::shared_ptr<nano::transport::channel> const & channel) const;
bool try_send_message (std::shared_ptr<nano::transport::channel> const & channel);
void received_message (std::shared_ptr<nano::transport::channel> const & channel);
std::shared_ptr<nano::transport::channel> channel ();
[[nodiscard]] std::size_t size () const;
// Synchronize channels with the network, passed channels should be shuffled
void sync (std::deque<std::shared_ptr<nano::transport::channel>> const & list);
// Cleans up scores for closed channels
// Decays scores which become inaccurate over time due to message drops
void timeout ();
void sync (std::deque<std::shared_ptr<nano::transport::channel>> const & list);
std::size_t size () const;
std::size_t available () const;
nano::container_info container_info () const;
private:
bootstrap_config const & config;
@ -71,14 +80,16 @@ namespace bootstrap
// Indexes scores by the number of outstanding requests in ascending order
class tag_outstanding {};
using scoring_t = boost::multi_index_container<peer_score,
using ordered_scoring = boost::multi_index_container<peer_score,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_channel>,
mi::member<peer_score, nano::transport::channel *, &peer_score::channel_ptr>>,
mi::ordered_non_unique<mi::tag<tag_outstanding>,
mi::member<peer_score, uint64_t, &peer_score::outstanding>>>>;
// clang-format on
scoring_t scoring;
ordered_scoring scoring;
std::deque<std::shared_ptr<nano::transport::channel>> channels;
};
}
}

View file

@ -5182,7 +5182,7 @@ void nano::json_handler::debug_bootstrap_priority_info ()
boost::property_tree::ptree response_blocking;
for (auto const & entry : blocking)
{
const auto account = entry.account ();
const auto account = entry.account;
const auto dependency = entry.dependency;
response_blocking.put (account.to_account (), dependency.to_string ());