Implement frontier scanning

This commit is contained in:
Piotr Wójcik 2024-08-27 20:19:28 +02:00
commit 7baf378420
13 changed files with 588 additions and 39 deletions

View file

@ -601,7 +601,8 @@ TEST (toml, daemon_config_deserialize_no_defaults)
[node.bootstrap_ascending]
enable = false
enable_database_scan = false
enable_frontier_scan = false
enable_database_scan = true
enable_dependency_walker = false
channel_limit = 999
database_rate_limit = 999
@ -780,6 +781,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size);
ASSERT_NE (conf.node.bootstrap_ascending.enable, defaults.node.bootstrap_ascending.enable);
ASSERT_NE (conf.node.bootstrap_ascending.enable_frontier_scan, defaults.node.bootstrap_ascending.enable_frontier_scan);
ASSERT_NE (conf.node.bootstrap_ascending.enable_database_scan, defaults.node.bootstrap_ascending.enable_database_scan);
ASSERT_NE (conf.node.bootstrap_ascending.enable_dependency_walker, defaults.node.bootstrap_ascending.enable_dependency_walker);
ASSERT_NE (conf.node.bootstrap_ascending.channel_limit, defaults.node.bootstrap_ascending.channel_limit);

View file

@ -64,10 +64,10 @@ class rate_limiter final
{
public:
// initialize with limit 0 = unbounded
rate_limiter (std::size_t limit, double burst_ratio);
rate_limiter (std::size_t limit, double burst_ratio = 1.0);
bool should_pass (std::size_t buffer_size);
void reset (std::size_t limit, double burst_ratio);
void reset (std::size_t limit, double burst_ratio = 1.0);
private:
nano::rate::token_bucket bucket;

View file

@ -60,11 +60,13 @@ enum class type
blockprocessor_overfill,
bootstrap_ascending,
bootstrap_ascending_accounts,
bootstrap_ascending_verify,
bootstrap_ascending_verify_blocks,
bootstrap_ascending_verify_frontiers,
bootstrap_ascending_process,
bootstrap_ascending_request,
bootstrap_ascending_reply,
bootstrap_ascending_next,
bootstrap_ascending_frontiers,
bootstrap_server,
bootstrap_server_request,
bootstrap_server_overfill,
@ -117,6 +119,7 @@ enum class detail
inserted,
erased,
request,
request_failed,
broadcast,
cleanup,
top,
@ -423,7 +426,7 @@ enum class detail
missing_cookie,
invalid_genesis,
// bootstrap ascending
// bootstrap_ascending
missing_tag,
reply,
throttled,
@ -431,13 +434,18 @@ enum class detail
timeout,
nothing_new,
account_info_empty,
frontiers_empty,
loop_database,
loop_dependencies,
loop_frontiers,
loop_frontiers_processing,
duplicate_request,
invalid_response_type,
timestamp_reset,
process_frontiers,
dropped_frontiers,
// bootstrap ascending accounts
// bootstrap_ascending_accounts
prioritize,
prioritize_failed,
block,
@ -446,11 +454,21 @@ enum class detail
dependency_update,
dependency_update_failed,
// bootstrap_ascending_frontiers
done,
done_range,
done_empty,
next_by_requests,
next_by_timestamp,
advance,
advance_failed,
next_none,
next_priority,
next_database,
next_blocking,
next_dependency,
next_frontier,
blocking_insert,
blocking_erase_overflow,
@ -461,6 +479,10 @@ enum class detail
deprioritize,
deprioritize_failed,
sync_dependencies,
frontiers_processed,
frontiers_prioritized,
frontiers_outdated,
frontiers_pending,
request_blocks,
request_account_info,
@ -468,6 +490,7 @@ enum class detail
// active
started_hinted,
started_optimistic,
// rep_crawler
channel_dead,
query_target_failed,

View file

@ -101,7 +101,10 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
thread_role_name_string = "Voting que";
break;
case nano::thread_role::name::ascending_bootstrap:
thread_role_name_string = "Bootstrap asc";
thread_role_name_string = "Ascboot";
break;
case nano::thread_role::name::ascending_bootstrap_worker:
thread_role_name_string = "Ascboot work";
break;
case nano::thread_role::name::bootstrap_server:
thread_role_name_string = "Bootstrap serv";

View file

@ -41,6 +41,7 @@ enum class name
bootstrap_server,
telemetry,
ascending_bootstrap,
ascending_bootstrap_worker,
bootstrap_server_requests,
bootstrap_server_responses,
scheduler_hinted,

View file

@ -44,13 +44,15 @@ add_library(
bootstrap/bootstrap.cpp
bootstrap/bootstrap_server.hpp
bootstrap/bootstrap_server.cpp
bootstrap_ascending/common.hpp
bootstrap_ascending/throttle.hpp
bootstrap_ascending/throttle.cpp
bootstrap_ascending/account_sets.hpp
bootstrap_ascending/account_sets.cpp
bootstrap_ascending/database_scan.hpp
bootstrap_ascending/database_scan.cpp
bootstrap_ascending/common.hpp
bootstrap_ascending/frontier_scan.hpp
bootstrap_ascending/frontier_scan.cpp
bootstrap_ascending/throttle.hpp
bootstrap_ascending/throttle.cpp
bootstrap_ascending/peer_scoring.hpp
bootstrap_ascending/peer_scoring.cpp
bootstrap_ascending/service.hpp

View file

@ -34,6 +34,7 @@ nano::error nano::bootstrap_ascending_config::deserialize (nano::tomlconfig & to
toml.get ("enable", enable);
toml.get ("enable_database_scan", enable_database_scan);
toml.get ("enable_dependency_walker", enable_dependency_walker);
toml.get ("enable_frontier_scan", enable_frontier_scan);
toml.get ("channel_limit", channel_limit);
toml.get ("database_rate_limit", database_rate_limit);
@ -59,6 +60,7 @@ nano::error nano::bootstrap_ascending_config::serialize (nano::tomlconfig & toml
toml.put ("enable", enable, "Enable or disable the ascending bootstrap. Disabling it is not recommended and will prevent the node from syncing.\ntype:bool");
toml.put ("enable_database_scan", enable_database_scan, "Enable or disable the 'database scan` strategy for the ascending bootstrap.\ntype:bool");
toml.put ("enable_dependency_walker", enable_dependency_walker, "Enable or disable the 'dependency walker` strategy for the ascending bootstrap.\ntype:bool");
toml.put ("enable_frontier_scan", enable_frontier_scan, "Enable or disable the 'frontier scan` strategy for the ascending bootstrap.\ntype:bool");
toml.put ("channel_limit", channel_limit, "Maximum number of un-responded requests per channel.\nNote: changing to unlimited (0) is not recommended.\ntype:uint64");
toml.put ("database_rate_limit", database_rate_limit, "Rate limit on scanning accounts and pending entries from database.\nNote: changing to unlimited (0) is not recommended as this operation competes for resources on querying the database.\ntype:uint64");

View file

@ -22,6 +22,19 @@ public:
std::chrono::milliseconds cooldown{ 1000 * 3 };
};
// TODO: This should be moved next to `frontier_scan` class
class frontier_scan_config final
{
public:
// TODO: Serialize & deserialize
unsigned head_parallelistm{ 128 };
unsigned consideration_count{ 4 };
std::size_t candidates{ 1000 };
std::chrono::milliseconds cooldown{ 1000 * 5 };
std::size_t max_pending{ 16 };
};
// TODO: This should be moved next to `bootstrap_ascending` class
class bootstrap_ascending_config final
{
@ -31,12 +44,14 @@ public:
public:
bool enable{ true };
bool enable_database_scan{ true };
bool enable_database_scan{ false };
bool enable_dependency_walker{ true };
bool enable_frontier_scan{ true };
// Maximum number of un-responded requests per channel, should be lower or equal to bootstrap server max queue size
std::size_t channel_limit{ 16 };
std::size_t database_rate_limit{ 256 };
std::size_t frontier_rate_limit{ 100 };
std::size_t database_warmup_ratio{ 10 };
std::size_t max_pull_count{ nano::bootstrap_server::max_blocks };
std::chrono::milliseconds request_timeout{ 1000 * 5 };
@ -45,6 +60,7 @@ public:
std::size_t block_processor_threshold{ 1000 };
std::size_t max_requests{ 1024 };
nano::account_sets_config account_sets;
account_sets_config account_sets;
frontier_scan_config frontier_scan;
};
}

View file

@ -0,0 +1,187 @@
#include <nano/node/bootstrap_ascending/frontier_scan.hpp>
#include <boost/multiprecision/cpp_dec_float.hpp>
#include <boost/multiprecision/cpp_int.hpp>
nano::bootstrap_ascending::frontier_scan::frontier_scan (frontier_scan_config const & config_a, nano::stats & stats_a) :
config{ config_a },
stats{ stats_a }
{
// Divide nano::account numeric range into consecutive and equal ranges
nano::uint256_t max_account = std::numeric_limits<nano::uint256_t>::max ();
nano::uint256_t range_size = max_account / config.head_parallelistm;
for (unsigned i = 0; i < config.head_parallelistm; ++i)
{
// Start at 1 to avoid the burn account
nano::uint256_t start = (i == 0) ? 1 : i * range_size;
nano::uint256_t end = (i == config.head_parallelistm - 1) ? max_account : start + range_size;
heads.emplace_back (frontier_head{ nano::account{ start }, nano::account{ end } });
}
release_assert (!heads.empty ());
}
nano::account nano::bootstrap_ascending::frontier_scan::next ()
{
auto const cutoff = std::chrono::steady_clock::now () - config.cooldown;
auto & heads_by_timestamp = heads.get<tag_timestamp> ();
for (auto it = heads_by_timestamp.begin (); it != heads_by_timestamp.end (); ++it)
{
auto const & head = *it;
if (head.requests < config.consideration_count || head.timestamp < cutoff)
{
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, (head.requests < config.consideration_count) ? nano::stat::detail::next_by_requests : nano::stat::detail::next_by_timestamp);
debug_assert (head.next.number () >= head.start.number ());
debug_assert (head.next.number () < head.end.number ());
auto result = head.next;
heads_by_timestamp.modify (it, [this] (auto & entry) {
entry.requests += 1;
entry.timestamp = std::chrono::steady_clock::now ();
});
return result;
}
}
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::next_none);
return { 0 };
}
bool nano::bootstrap_ascending::frontier_scan::process (nano::account start, std::deque<std::pair<nano::account, nano::block_hash>> const & response)
{
debug_assert (std::all_of (response.begin (), response.end (), [&] (auto const & pair) { return pair.first.number () >= start.number (); }));
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::process);
// Find the first head with head.start <= start
auto & heads_by_start = heads.get<tag_start> ();
auto it = heads_by_start.upper_bound (start);
release_assert (it != heads_by_start.begin ());
it = std::prev (it);
bool done = false;
heads_by_start.modify (it, [this, &response, &done] (frontier_head & entry) {
entry.completed += 1;
for (auto const & [account, _] : response)
{
// Only consider candidates that actually advance the current frontier
if (account.number () > entry.next.number ())
{
entry.candidates.insert (account);
}
}
// Trim the candidates
while (entry.candidates.size () > config.candidates)
{
release_assert (!entry.candidates.empty ());
entry.candidates.erase (std::prev (entry.candidates.end ()));
}
// Special case for the last frontier head that won't receive larger than max frontier
if (entry.completed >= config.consideration_count * 2 && entry.candidates.empty ())
{
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::done_empty);
entry.candidates.insert (entry.end);
}
// Check if done
if (entry.completed >= config.consideration_count && !entry.candidates.empty ())
{
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::done);
// Take the last candidate as the next frontier
release_assert (!entry.candidates.empty ());
auto it = std::prev (entry.candidates.end ());
debug_assert (entry.next.number () < it->number ());
entry.next = *it;
entry.processed += entry.candidates.size ();
entry.candidates.clear ();
entry.requests = 0;
entry.completed = 0;
entry.timestamp = {};
// Bound the search range
if (entry.next.number () >= entry.end.number ())
{
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::done_range);
entry.next = entry.start;
}
done = true;
}
});
return done;
}
std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::frontier_scan::collect_container_info (std::string const & name)
{
auto collect_progress = [&] () {
auto composite = std::make_unique<container_info_composite> ("progress");
for (int n = 0; n < heads.size (); ++n)
{
auto const & head = heads[n];
boost::multiprecision::cpp_dec_float_50 start{ head.start.number ().str () };
boost::multiprecision::cpp_dec_float_50 next{ head.next.number ().str () };
boost::multiprecision::cpp_dec_float_50 end{ head.end.number ().str () };
boost::multiprecision::cpp_dec_float_50 progress = (next - start) * boost::multiprecision::cpp_dec_float_50 (1000000) / (end - start);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ std::to_string (n), progress.convert_to<std::uint64_t> (), 6 }));
}
return composite;
};
auto collect_candidates = [&] () {
auto composite = std::make_unique<container_info_composite> ("candidates");
for (int n = 0; n < heads.size (); ++n)
{
auto const & head = heads[n];
composite->add_component (std::make_unique<container_info_leaf> (container_info{ std::to_string (n), head.candidates.size (), 0 }));
}
return composite;
};
auto collect_responses = [&] () {
auto composite = std::make_unique<container_info_composite> ("responses");
for (int n = 0; n < heads.size (); ++n)
{
auto const & head = heads[n];
composite->add_component (std::make_unique<container_info_leaf> (container_info{ std::to_string (n), head.completed, 0 }));
}
return composite;
};
auto collect_processed = [&] () {
auto composite = std::make_unique<container_info_composite> ("processed");
for (int n = 0; n < heads.size (); ++n)
{
auto const & head = heads[n];
composite->add_component (std::make_unique<container_info_leaf> (container_info{ std::to_string (n), head.processed, 0 }));
}
return composite;
};
auto total_processed = std::accumulate (heads.begin (), heads.end (), std::size_t{ 0 }, [] (auto total, auto const & head) {
return total + head.processed;
});
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "total_processed", total_processed, 0 }));
composite->add_component (collect_progress ());
composite->add_component (collect_candidates ());
composite->add_component (collect_responses ());
composite->add_component (collect_processed ());
return composite;
}

View file

@ -0,0 +1,76 @@
#pragma once
#include <nano/node/bootstrap/bootstrap_config.hpp>
#include <nano/node/bootstrap_ascending/common.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
#include <chrono>
#include <map>
#include <set>
namespace mi = boost::multi_index;
namespace nano::bootstrap_ascending
{
class frontier_scan
{
public:
frontier_scan (frontier_scan_config const &, nano::stats &);
nano::account next ();
bool process (nano::account start, std::deque<std::pair<nano::account, nano::block_hash>> const & response);
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
private: // Dependencies
frontier_scan_config const & config;
nano::stats & stats;
private:
struct frontier_head
{
frontier_head (nano::account start_a, nano::account end_a) :
start{ start_a },
end{ end_a },
next{ start_a }
{
}
// The range of accounts to scan is [start, end)
nano::account start;
nano::account end;
nano::account next;
std::set<nano::account> candidates;
unsigned requests{ 0 };
unsigned completed{ 0 };
std::chrono::steady_clock::time_point timestamp{};
size_t processed{ 0 }; // Total number of accounts processed
};
// clang-format off
class tag_sequenced {};
class tag_start {};
class tag_timestamp {};
using ordered_heads = boost::multi_index_container<frontier_head,
mi::indexed_by<
mi::random_access<mi::tag<tag_sequenced>>,
mi::ordered_unique<mi::tag<tag_start>,
mi::member<frontier_head, nano::account, &frontier_head::start>>,
mi::ordered_non_unique<mi::tag<tag_timestamp>,
mi::member<frontier_head, std::chrono::steady_clock::time_point, &frontier_head::timestamp>>
>>;
// clang-format on
ordered_heads heads;
};
}

View file

@ -29,9 +29,12 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf
logger{ logger_a },
accounts{ config.account_sets, stats },
database_scan{ ledger },
frontiers{ config.frontier_scan, stats },
throttle{ compute_throttle_size () },
scoring{ config, node_config_a.network_params.network },
database_limiter{ config.database_rate_limit, 1.0 }
database_limiter{ config.database_rate_limit },
frontiers_limiter{ config.frontier_rate_limit },
workers{ 1, nano::thread_role::name::ascending_bootstrap_worker }
{
// TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread
block_processor.batch_processed.add ([this] (auto const & batch) {
@ -57,6 +60,7 @@ nano::bootstrap_ascending::service::~service ()
debug_assert (!priorities_thread.joinable ());
debug_assert (!database_thread.joinable ());
debug_assert (!dependencies_thread.joinable ());
debug_assert (!frontiers_thread.joinable ());
debug_assert (!timeout_thread.joinable ());
}
@ -65,6 +69,7 @@ void nano::bootstrap_ascending::service::start ()
debug_assert (!priorities_thread.joinable ());
debug_assert (!database_thread.joinable ());
debug_assert (!dependencies_thread.joinable ());
debug_assert (!frontiers_thread.joinable ());
debug_assert (!timeout_thread.joinable ());
if (!config.enable)
@ -94,6 +99,14 @@ void nano::bootstrap_ascending::service::start ()
});
}
if (config.enable_frontier_scan)
{
frontiers_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run_frontiers ();
});
}
timeout_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run_timeouts ();
@ -111,10 +124,11 @@ void nano::bootstrap_ascending::service::stop ()
nano::join_or_pass (priorities_thread);
nano::join_or_pass (database_thread);
nano::join_or_pass (dependencies_thread);
nano::join_or_pass (frontiers_thread);
nano::join_or_pass (timeout_thread);
}
void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::channel> const & channel, async_tag tag)
bool nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::channel> const & channel, async_tag tag)
{
debug_assert (tag.type != query_type::invalid);
debug_assert (tag.source != query_source::invalid);
@ -125,6 +139,8 @@ void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::
tags.get<tag_id> ().insert (tag);
}
on_request.notify (tag, channel);
nano::asc_pull_req request{ network_constants };
request.id = tag.id;
@ -152,6 +168,16 @@ void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::
request.payload = pld;
}
break;
case query_type::frontiers:
{
request.type = nano::asc_pull_type::frontiers;
nano::asc_pull_req::frontiers_payload pld;
pld.start = tag.start.as_account ();
pld.count = nano::asc_pull_ack::frontiers_payload::max_frontiers;
request.payload = pld;
}
break;
default:
debug_assert (false);
}
@ -165,6 +191,8 @@ void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::
channel->send (
request, nullptr,
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
return true; // TODO: Return channel send result
}
std::size_t nano::bootstrap_ascending::service::priority_size () const
@ -409,6 +437,24 @@ nano::block_hash nano::bootstrap_ascending::service::wait_blocking ()
return result;
}
nano::account nano::bootstrap_ascending::service::wait_frontier ()
{
nano::account result{ 0 };
wait ([this, &result] () {
debug_assert (!mutex.try_lock ());
result = frontiers.next ();
if (!result.is_zero ())
{
stats.inc (nano::stat::type::bootstrap_ascending_next, nano::stat::detail::next_frontier);
return true;
}
return false;
});
return result;
}
bool nano::bootstrap_ascending::service::request (nano::account account, size_t count, std::shared_ptr<nano::transport::channel> const & channel, query_source source)
{
debug_assert (count > 0);
@ -436,11 +482,7 @@ bool nano::bootstrap_ascending::service::request (nano::account account, size_t
tag.start = account;
}
on_request.notify (tag, channel);
send (channel, tag);
return true; // Request sent
return send (channel, tag);
}
bool nano::bootstrap_ascending::service::request_info (nano::block_hash hash, std::shared_ptr<nano::transport::channel> const & channel, query_source source)
@ -451,11 +493,17 @@ bool nano::bootstrap_ascending::service::request_info (nano::block_hash hash, st
tag.start = hash;
tag.hash = hash;
on_request.notify (tag, channel);
return send (channel, tag);
}
send (channel, tag);
bool nano::bootstrap_ascending::service::request_frontiers (nano::account start, std::shared_ptr<nano::transport::channel> const & channel, query_source source)
{
async_tag tag{};
tag.type = query_type::frontiers;
tag.source = source;
tag.start = start;
return true; // Request sent
return send (channel, tag);
}
void nano::bootstrap_ascending::service::run_one_priority ()
@ -549,6 +597,103 @@ void nano::bootstrap_ascending::service::run_dependencies ()
}
}
void nano::bootstrap_ascending::service::run_one_frontier ()
{
wait ([this] () {
return !accounts.priority_half_full ();
});
wait ([this] () {
return frontiers_limiter.should_pass (1);
});
wait ([this] () {
return workers.num_queued_tasks () < config.frontier_scan.max_pending;
});
wait_tags ();
auto channel = wait_channel ();
if (!channel)
{
return;
}
auto frontier = wait_frontier ();
if (frontier.is_zero ())
{
return;
}
request_frontiers (frontier, channel, query_source::frontiers);
}
void nano::bootstrap_ascending::service::run_frontiers ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
lock.unlock ();
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_frontiers);
run_one_frontier ();
lock.lock ();
}
}
void nano::bootstrap_ascending::service::process_frontiers (std::deque<std::pair<nano::account, nano::block_hash>> const & frontiers)
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::process_frontiers);
size_t outdated = 0;
size_t pending = 0;
// Accounts with outdated frontiers to sync
std::deque<nano::account> result;
{
auto transaction = ledger.tx_begin_read ();
auto should_prioritize = [&] (nano::account const & account, nano::block_hash const & frontier) {
if (ledger.any.block_exists_or_pruned (transaction, frontier))
{
return false;
}
if (auto info = ledger.any.account_get (transaction, account))
{
if (info->head != frontier)
{
outdated++;
return true; // Frontier is outdated
}
return false;
}
if (auto receivable = ledger.any.receivable_lower_bound (transaction, account, { 0 }))
{
if (receivable->first.account == account)
{
pending++;
return true; // Account doesn't exist but has pending blocks in the ledger
}
return false;
}
return false;
};
for (auto const & [account, frontier] : frontiers)
{
if (should_prioritize (account, frontier))
{
result.push_back (account);
}
}
}
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers_processed, frontiers.size ());
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers_prioritized, result.size ());
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers_outdated, outdated);
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers_pending, pending);
nano::lock_guard<nano::mutex> guard{ mutex };
for (auto const & account : result)
{
accounts.priority_set (account);
}
}
void nano::bootstrap_ascending::service::cleanup_and_sync ()
{
debug_assert (!mutex.try_lock ());
@ -622,7 +767,7 @@ void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & mes
}
bool operator() (const nano::asc_pull_ack::frontiers_payload & response) const
{
return false; // TODO: Handle frontiers
return type == query_type::frontiers;
}
bool operator() (const nano::empty_payload & response) const
{
@ -664,7 +809,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc
{
case verify_result::ok:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::ok);
stats.inc (nano::stat::type::bootstrap_ascending_verify_blocks, nano::stat::detail::ok);
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ());
auto blocks = response.blocks;
@ -705,7 +850,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc
break;
case verify_result::nothing_new:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::nothing_new);
stats.inc (nano::stat::type::bootstrap_ascending_verify_blocks, nano::stat::detail::nothing_new);
nano::lock_guard<nano::mutex> lock{ mutex };
accounts.priority_down (tag.account);
@ -717,7 +862,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc
break;
case verify_result::invalid:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::invalid);
stats.inc (nano::stat::type::bootstrap_ascending_verify_blocks, nano::stat::detail::invalid);
}
break;
}
@ -731,24 +876,69 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::acco
if (response.account.is_zero ())
{
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info_empty);
return;
}
else
{
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info);
// Prioritize account containing the dependency
{
nano::lock_guard<nano::mutex> lock{ mutex };
accounts.dependency_update (tag.hash, response.account);
accounts.priority_set (response.account);
}
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info);
// Prioritize account containing the dependency
{
nano::lock_guard<nano::mutex> lock{ mutex };
accounts.dependency_update (tag.hash, response.account);
accounts.priority_set (response.account);
}
}
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::frontiers_payload & response, const async_tag & tag)
{
// TODO: Make use of frontiers info
debug_assert (tag.type == query_type::frontiers);
debug_assert (!tag.start.is_zero ());
if (response.frontiers.empty ())
{
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::frontiers_empty);
return;
}
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::frontiers);
auto result = verify (response, tag);
switch (result)
{
case verify_result::ok:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify_frontiers, nano::stat::detail::ok);
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers, nano::stat::dir::in, response.frontiers.size ());
{
nano::lock_guard<nano::mutex> lock{ mutex };
frontiers.process (tag.start.as_account (), response.frontiers);
}
// Allow some overfill to avoid unnecessarily dropping responses
if (workers.num_queued_tasks () < config.frontier_scan.max_pending * 4)
{
workers.push_task ([this, frontiers = response.frontiers] {
process_frontiers (frontiers);
});
}
else
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::dropped_frontiers);
}
}
break;
case verify_result::nothing_new:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify_frontiers, nano::stat::detail::nothing_new);
}
break;
case verify_result::invalid:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify_frontiers, nano::stat::detail::invalid);
}
break;
}
}
void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const async_tag & tag)
@ -816,6 +1006,35 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser
return verify_result::ok;
}
nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::service::verify (nano::asc_pull_ack::frontiers_payload const & response, async_tag const & tag) const
{
auto const & frontiers = response.frontiers;
if (frontiers.empty ())
{
return verify_result::nothing_new;
}
// Ensure frontiers accounts are in ascending order
nano::account previous{ 0 };
for (auto const & [account, _] : frontiers)
{
if (account.number () <= previous.number ())
{
return verify_result::invalid;
}
previous = account;
}
// Ensure the frontiers are larger or equal to the requested frontier
if (frontiers.front ().first.number () < tag.start.as_account ().number ())
{
return verify_result::invalid;
}
return verify_result::ok;
}
auto nano::bootstrap_ascending::service::info () const -> nano::bootstrap_ascending::account_sets::info_t
{
nano::lock_guard<nano::mutex> lock{ mutex };
@ -840,6 +1059,8 @@ std::unique_ptr<nano::container_info_component> nano::bootstrap_ascending::servi
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "throttle_successes", throttle.successes (), 0 }));
composite->add_component (accounts.collect_container_info ("accounts"));
composite->add_component (database_scan.collect_container_info ("database_scan"));
composite->add_component (frontiers.collect_container_info ("frontiers"));
composite->add_component (workers.collect_container_info ("workers"));
return composite;
}

View file

@ -5,13 +5,15 @@
#include <nano/lib/numbers.hpp>
#include <nano/lib/observer_set.hpp>
#include <nano/lib/rate_limiting.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/bootstrap/bootstrap_config.hpp>
#include <nano/node/bootstrap_ascending/account_sets.hpp>
#include <nano/node/bootstrap_ascending/common.hpp>
#include <nano/node/bootstrap_ascending/database_scan.hpp>
#include <nano/node/bootstrap_ascending/frontier_scan.hpp>
#include <nano/node/bootstrap_ascending/peer_scoring.hpp>
#include <nano/node/bootstrap_ascending/throttle.hpp>
#include <nano/node/fwd.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
@ -63,6 +65,7 @@ namespace bootstrap_ascending
blocks_by_hash,
blocks_by_account,
account_info_by_hash,
frontiers,
};
enum class query_source
@ -71,6 +74,7 @@ namespace bootstrap_ascending
priority,
database,
blocking,
frontiers,
};
struct async_tag
@ -101,8 +105,11 @@ namespace bootstrap_ascending
void run_one_database (bool should_throttle);
void run_dependencies ();
void run_one_blocking ();
void run_one_frontier ();
void run_frontiers ();
void run_timeouts ();
void cleanup_and_sync ();
void process_frontiers (std::deque<std::pair<nano::account, nano::block_hash>> const & frontiers);
/* Waits for a condition to be satisfied with incremental backoff */
void wait (std::function<bool ()> const & predicate) const;
@ -122,10 +129,13 @@ namespace bootstrap_ascending
/* Waits for next available blocking block */
nano::block_hash next_blocking ();
nano::block_hash wait_blocking ();
/* Waits for next available frontier scan range */
nano::account wait_frontier ();
bool request (nano::account, size_t count, std::shared_ptr<nano::transport::channel> const &, query_source);
bool request_info (nano::block_hash, std::shared_ptr<nano::transport::channel> const &, query_source);
void send (std::shared_ptr<nano::transport::channel> const &, async_tag tag);
bool request_frontiers (nano::account, std::shared_ptr<nano::transport::channel> const &, query_source);
bool send (std::shared_ptr<nano::transport::channel> const &, async_tag tag);
void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag);
void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag);
@ -146,6 +156,7 @@ namespace bootstrap_ascending
* - ok: otherwise, if all checks pass
*/
verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const;
verify_result verify (nano::asc_pull_ack::frontiers_payload const & response, async_tag const & tag) const;
size_t count_tags (nano::account const & account, query_source source) const;
size_t count_tags (nano::block_hash const & hash, query_source source) const;
@ -158,6 +169,7 @@ namespace bootstrap_ascending
nano::bootstrap_ascending::database_scan database_scan;
nano::bootstrap_ascending::throttle throttle;
nano::bootstrap_ascending::peer_scoring scoring;
nano::bootstrap_ascending::frontier_scan frontiers;
// clang-format off
class tag_sequenced {};
@ -181,6 +193,7 @@ namespace bootstrap_ascending
// Requests for accounts from database have much lower hitrate and could introduce strain on the network
// A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue
nano::rate_limiter database_limiter;
nano::rate_limiter frontiers_limiter;
nano::interval sync_dependencies_interval;
@ -190,7 +203,10 @@ namespace bootstrap_ascending
std::thread priorities_thread;
std::thread database_thread;
std::thread dependencies_thread;
std::thread frontiers_thread;
std::thread timeout_thread;
nano::thread_pool workers;
};
nano::stat::detail to_stat_detail (service::query_type);

View file

@ -724,7 +724,7 @@ public: // Payload definitions
static frontier deserialize_frontier (nano::stream &);
public: // Payload
std::vector<frontier> frontiers;
std::deque<frontier> frontiers;
public: // Logging
void operator() (nano::object_stream &) const;