From 7baf378420e94c2ac4f086b54b3c2481ed5950f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 27 Aug 2024 20:19:28 +0200 Subject: [PATCH] Implement frontier scanning --- nano/core_test/toml.cpp | 4 +- nano/lib/rate_limiting.hpp | 4 +- nano/lib/stats_enums.hpp | 29 +- nano/lib/thread_roles.cpp | 5 +- nano/lib/thread_roles.hpp | 1 + nano/node/CMakeLists.txt | 8 +- nano/node/bootstrap/bootstrap_config.cpp | 2 + nano/node/bootstrap/bootstrap_config.hpp | 20 +- .../bootstrap_ascending/frontier_scan.cpp | 187 ++++++++++++ .../bootstrap_ascending/frontier_scan.hpp | 76 +++++ nano/node/bootstrap_ascending/service.cpp | 269 ++++++++++++++++-- nano/node/bootstrap_ascending/service.hpp | 20 +- nano/node/messages.hpp | 2 +- 13 files changed, 588 insertions(+), 39 deletions(-) create mode 100644 nano/node/bootstrap_ascending/frontier_scan.cpp create mode 100644 nano/node/bootstrap_ascending/frontier_scan.hpp diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index eb1cb68ae..1ef6deba9 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -601,7 +601,8 @@ TEST (toml, daemon_config_deserialize_no_defaults) [node.bootstrap_ascending] enable = false - enable_database_scan = false + enable_frontier_scan = false + enable_database_scan = true enable_dependency_walker = false channel_limit = 999 database_rate_limit = 999 @@ -780,6 +781,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size); ASSERT_NE (conf.node.bootstrap_ascending.enable, defaults.node.bootstrap_ascending.enable); + ASSERT_NE (conf.node.bootstrap_ascending.enable_frontier_scan, defaults.node.bootstrap_ascending.enable_frontier_scan); ASSERT_NE (conf.node.bootstrap_ascending.enable_database_scan, defaults.node.bootstrap_ascending.enable_database_scan); ASSERT_NE (conf.node.bootstrap_ascending.enable_dependency_walker, defaults.node.bootstrap_ascending.enable_dependency_walker); ASSERT_NE (conf.node.bootstrap_ascending.channel_limit, defaults.node.bootstrap_ascending.channel_limit); diff --git a/nano/lib/rate_limiting.hpp b/nano/lib/rate_limiting.hpp index f67246787..5e209f4d6 100644 --- a/nano/lib/rate_limiting.hpp +++ b/nano/lib/rate_limiting.hpp @@ -64,10 +64,10 @@ class rate_limiter final { public: // initialize with limit 0 = unbounded - rate_limiter (std::size_t limit, double burst_ratio); + rate_limiter (std::size_t limit, double burst_ratio = 1.0); bool should_pass (std::size_t buffer_size); - void reset (std::size_t limit, double burst_ratio); + void reset (std::size_t limit, double burst_ratio = 1.0); private: nano::rate::token_bucket bucket; diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index c8f3e5f81..e00adb8b4 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -60,11 +60,13 @@ enum class type blockprocessor_overfill, bootstrap_ascending, bootstrap_ascending_accounts, - bootstrap_ascending_verify, + bootstrap_ascending_verify_blocks, + bootstrap_ascending_verify_frontiers, bootstrap_ascending_process, bootstrap_ascending_request, bootstrap_ascending_reply, bootstrap_ascending_next, + bootstrap_ascending_frontiers, bootstrap_server, bootstrap_server_request, bootstrap_server_overfill, @@ -117,6 +119,7 @@ enum class detail inserted, erased, request, + request_failed, broadcast, cleanup, top, @@ -423,7 +426,7 @@ enum class detail missing_cookie, invalid_genesis, - // bootstrap ascending + // bootstrap_ascending missing_tag, reply, throttled, @@ -431,13 +434,18 @@ enum class detail timeout, nothing_new, account_info_empty, + frontiers_empty, loop_database, loop_dependencies, + loop_frontiers, + loop_frontiers_processing, duplicate_request, invalid_response_type, timestamp_reset, + process_frontiers, + dropped_frontiers, - // bootstrap ascending accounts + // bootstrap_ascending_accounts prioritize, prioritize_failed, block, @@ -446,11 +454,21 @@ enum class detail dependency_update, dependency_update_failed, + // bootstrap_ascending_frontiers + done, + done_range, + done_empty, + next_by_requests, + next_by_timestamp, + advance, + advance_failed, + next_none, next_priority, next_database, next_blocking, next_dependency, + next_frontier, blocking_insert, blocking_erase_overflow, @@ -461,6 +479,10 @@ enum class detail deprioritize, deprioritize_failed, sync_dependencies, + frontiers_processed, + frontiers_prioritized, + frontiers_outdated, + frontiers_pending, request_blocks, request_account_info, @@ -468,6 +490,7 @@ enum class detail // active started_hinted, started_optimistic, + // rep_crawler channel_dead, query_target_failed, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 76f467cd0..913c4aaa7 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -101,7 +101,10 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) thread_role_name_string = "Voting que"; break; case nano::thread_role::name::ascending_bootstrap: - thread_role_name_string = "Bootstrap asc"; + thread_role_name_string = "Ascboot"; + break; + case nano::thread_role::name::ascending_bootstrap_worker: + thread_role_name_string = "Ascboot work"; break; case nano::thread_role::name::bootstrap_server: thread_role_name_string = "Bootstrap serv"; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 5896318c4..9092392a7 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -41,6 +41,7 @@ enum class name bootstrap_server, telemetry, ascending_bootstrap, + ascending_bootstrap_worker, bootstrap_server_requests, bootstrap_server_responses, scheduler_hinted, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 419e00a72..548d16c0d 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -44,13 +44,15 @@ add_library( bootstrap/bootstrap.cpp bootstrap/bootstrap_server.hpp bootstrap/bootstrap_server.cpp - bootstrap_ascending/common.hpp - bootstrap_ascending/throttle.hpp - bootstrap_ascending/throttle.cpp bootstrap_ascending/account_sets.hpp bootstrap_ascending/account_sets.cpp bootstrap_ascending/database_scan.hpp bootstrap_ascending/database_scan.cpp + bootstrap_ascending/common.hpp + bootstrap_ascending/frontier_scan.hpp + bootstrap_ascending/frontier_scan.cpp + bootstrap_ascending/throttle.hpp + bootstrap_ascending/throttle.cpp bootstrap_ascending/peer_scoring.hpp bootstrap_ascending/peer_scoring.cpp bootstrap_ascending/service.hpp diff --git a/nano/node/bootstrap/bootstrap_config.cpp b/nano/node/bootstrap/bootstrap_config.cpp index ff1e847da..2264776e0 100644 --- a/nano/node/bootstrap/bootstrap_config.cpp +++ b/nano/node/bootstrap/bootstrap_config.cpp @@ -34,6 +34,7 @@ nano::error nano::bootstrap_ascending_config::deserialize (nano::tomlconfig & to toml.get ("enable", enable); toml.get ("enable_database_scan", enable_database_scan); toml.get ("enable_dependency_walker", enable_dependency_walker); + toml.get ("enable_frontier_scan", enable_frontier_scan); toml.get ("channel_limit", channel_limit); toml.get ("database_rate_limit", database_rate_limit); @@ -59,6 +60,7 @@ nano::error nano::bootstrap_ascending_config::serialize (nano::tomlconfig & toml toml.put ("enable", enable, "Enable or disable the ascending bootstrap. Disabling it is not recommended and will prevent the node from syncing.\ntype:bool"); toml.put ("enable_database_scan", enable_database_scan, "Enable or disable the 'database scan` strategy for the ascending bootstrap.\ntype:bool"); toml.put ("enable_dependency_walker", enable_dependency_walker, "Enable or disable the 'dependency walker` strategy for the ascending bootstrap.\ntype:bool"); + toml.put ("enable_frontier_scan", enable_frontier_scan, "Enable or disable the 'frontier scan` strategy for the ascending bootstrap.\ntype:bool"); toml.put ("channel_limit", channel_limit, "Maximum number of un-responded requests per channel.\nNote: changing to unlimited (0) is not recommended.\ntype:uint64"); toml.put ("database_rate_limit", database_rate_limit, "Rate limit on scanning accounts and pending entries from database.\nNote: changing to unlimited (0) is not recommended as this operation competes for resources on querying the database.\ntype:uint64"); diff --git a/nano/node/bootstrap/bootstrap_config.hpp b/nano/node/bootstrap/bootstrap_config.hpp index af7b98bcb..cd2c9b8d7 100644 --- a/nano/node/bootstrap/bootstrap_config.hpp +++ b/nano/node/bootstrap/bootstrap_config.hpp @@ -22,6 +22,19 @@ public: std::chrono::milliseconds cooldown{ 1000 * 3 }; }; +// TODO: This should be moved next to `frontier_scan` class +class frontier_scan_config final +{ +public: + // TODO: Serialize & deserialize + + unsigned head_parallelistm{ 128 }; + unsigned consideration_count{ 4 }; + std::size_t candidates{ 1000 }; + std::chrono::milliseconds cooldown{ 1000 * 5 }; + std::size_t max_pending{ 16 }; +}; + // TODO: This should be moved next to `bootstrap_ascending` class class bootstrap_ascending_config final { @@ -31,12 +44,14 @@ public: public: bool enable{ true }; - bool enable_database_scan{ true }; + bool enable_database_scan{ false }; bool enable_dependency_walker{ true }; + bool enable_frontier_scan{ true }; // Maximum number of un-responded requests per channel, should be lower or equal to bootstrap server max queue size std::size_t channel_limit{ 16 }; std::size_t database_rate_limit{ 256 }; + std::size_t frontier_rate_limit{ 100 }; std::size_t database_warmup_ratio{ 10 }; std::size_t max_pull_count{ nano::bootstrap_server::max_blocks }; std::chrono::milliseconds request_timeout{ 1000 * 5 }; @@ -45,6 +60,7 @@ public: std::size_t block_processor_threshold{ 1000 }; std::size_t max_requests{ 1024 }; - nano::account_sets_config account_sets; + account_sets_config account_sets; + frontier_scan_config frontier_scan; }; } diff --git a/nano/node/bootstrap_ascending/frontier_scan.cpp b/nano/node/bootstrap_ascending/frontier_scan.cpp new file mode 100644 index 000000000..28fab99de --- /dev/null +++ b/nano/node/bootstrap_ascending/frontier_scan.cpp @@ -0,0 +1,187 @@ +#include + +#include +#include + +nano::bootstrap_ascending::frontier_scan::frontier_scan (frontier_scan_config const & config_a, nano::stats & stats_a) : + config{ config_a }, + stats{ stats_a } +{ + // Divide nano::account numeric range into consecutive and equal ranges + nano::uint256_t max_account = std::numeric_limits::max (); + nano::uint256_t range_size = max_account / config.head_parallelistm; + + for (unsigned i = 0; i < config.head_parallelistm; ++i) + { + // Start at 1 to avoid the burn account + nano::uint256_t start = (i == 0) ? 1 : i * range_size; + nano::uint256_t end = (i == config.head_parallelistm - 1) ? max_account : start + range_size; + + heads.emplace_back (frontier_head{ nano::account{ start }, nano::account{ end } }); + } + + release_assert (!heads.empty ()); +} + +nano::account nano::bootstrap_ascending::frontier_scan::next () +{ + auto const cutoff = std::chrono::steady_clock::now () - config.cooldown; + + auto & heads_by_timestamp = heads.get (); + for (auto it = heads_by_timestamp.begin (); it != heads_by_timestamp.end (); ++it) + { + auto const & head = *it; + + if (head.requests < config.consideration_count || head.timestamp < cutoff) + { + stats.inc (nano::stat::type::bootstrap_ascending_frontiers, (head.requests < config.consideration_count) ? nano::stat::detail::next_by_requests : nano::stat::detail::next_by_timestamp); + + debug_assert (head.next.number () >= head.start.number ()); + debug_assert (head.next.number () < head.end.number ()); + + auto result = head.next; + + heads_by_timestamp.modify (it, [this] (auto & entry) { + entry.requests += 1; + entry.timestamp = std::chrono::steady_clock::now (); + }); + + return result; + } + } + + stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::next_none); + return { 0 }; +} + +bool nano::bootstrap_ascending::frontier_scan::process (nano::account start, std::deque> const & response) +{ + debug_assert (std::all_of (response.begin (), response.end (), [&] (auto const & pair) { return pair.first.number () >= start.number (); })); + + stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::process); + + // Find the first head with head.start <= start + auto & heads_by_start = heads.get (); + auto it = heads_by_start.upper_bound (start); + release_assert (it != heads_by_start.begin ()); + it = std::prev (it); + + bool done = false; + heads_by_start.modify (it, [this, &response, &done] (frontier_head & entry) { + entry.completed += 1; + + for (auto const & [account, _] : response) + { + // Only consider candidates that actually advance the current frontier + if (account.number () > entry.next.number ()) + { + entry.candidates.insert (account); + } + } + + // Trim the candidates + while (entry.candidates.size () > config.candidates) + { + release_assert (!entry.candidates.empty ()); + entry.candidates.erase (std::prev (entry.candidates.end ())); + } + + // Special case for the last frontier head that won't receive larger than max frontier + if (entry.completed >= config.consideration_count * 2 && entry.candidates.empty ()) + { + stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::done_empty); + entry.candidates.insert (entry.end); + } + + // Check if done + if (entry.completed >= config.consideration_count && !entry.candidates.empty ()) + { + stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::done); + + // Take the last candidate as the next frontier + release_assert (!entry.candidates.empty ()); + auto it = std::prev (entry.candidates.end ()); + + debug_assert (entry.next.number () < it->number ()); + entry.next = *it; + entry.processed += entry.candidates.size (); + entry.candidates.clear (); + entry.requests = 0; + entry.completed = 0; + entry.timestamp = {}; + + // Bound the search range + if (entry.next.number () >= entry.end.number ()) + { + stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::done_range); + entry.next = entry.start; + } + + done = true; + } + }); + + return done; +} + +std::unique_ptr nano::bootstrap_ascending::frontier_scan::collect_container_info (std::string const & name) +{ + auto collect_progress = [&] () { + auto composite = std::make_unique ("progress"); + for (int n = 0; n < heads.size (); ++n) + { + auto const & head = heads[n]; + + boost::multiprecision::cpp_dec_float_50 start{ head.start.number ().str () }; + boost::multiprecision::cpp_dec_float_50 next{ head.next.number ().str () }; + boost::multiprecision::cpp_dec_float_50 end{ head.end.number ().str () }; + + boost::multiprecision::cpp_dec_float_50 progress = (next - start) * boost::multiprecision::cpp_dec_float_50 (1000000) / (end - start); + + composite->add_component (std::make_unique (container_info{ std::to_string (n), progress.convert_to (), 6 })); + } + return composite; + }; + + auto collect_candidates = [&] () { + auto composite = std::make_unique ("candidates"); + for (int n = 0; n < heads.size (); ++n) + { + auto const & head = heads[n]; + composite->add_component (std::make_unique (container_info{ std::to_string (n), head.candidates.size (), 0 })); + } + return composite; + }; + + auto collect_responses = [&] () { + auto composite = std::make_unique ("responses"); + for (int n = 0; n < heads.size (); ++n) + { + auto const & head = heads[n]; + composite->add_component (std::make_unique (container_info{ std::to_string (n), head.completed, 0 })); + } + return composite; + }; + + auto collect_processed = [&] () { + auto composite = std::make_unique ("processed"); + for (int n = 0; n < heads.size (); ++n) + { + auto const & head = heads[n]; + composite->add_component (std::make_unique (container_info{ std::to_string (n), head.processed, 0 })); + } + return composite; + }; + + auto total_processed = std::accumulate (heads.begin (), heads.end (), std::size_t{ 0 }, [] (auto total, auto const & head) { + return total + head.processed; + }); + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "total_processed", total_processed, 0 })); + composite->add_component (collect_progress ()); + composite->add_component (collect_candidates ()); + composite->add_component (collect_responses ()); + composite->add_component (collect_processed ()); + return composite; +} diff --git a/nano/node/bootstrap_ascending/frontier_scan.hpp b/nano/node/bootstrap_ascending/frontier_scan.hpp new file mode 100644 index 000000000..382c59b47 --- /dev/null +++ b/nano/node/bootstrap_ascending/frontier_scan.hpp @@ -0,0 +1,76 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace mi = boost::multi_index; + +namespace nano::bootstrap_ascending +{ +class frontier_scan +{ +public: + frontier_scan (frontier_scan_config const &, nano::stats &); + + nano::account next (); + bool process (nano::account start, std::deque> const & response); + + std::unique_ptr collect_container_info (std::string const & name); + +private: // Dependencies + frontier_scan_config const & config; + nano::stats & stats; + +private: + struct frontier_head + { + frontier_head (nano::account start_a, nano::account end_a) : + start{ start_a }, + end{ end_a }, + next{ start_a } + { + } + + // The range of accounts to scan is [start, end) + nano::account start; + nano::account end; + + nano::account next; + std::set candidates; + + unsigned requests{ 0 }; + unsigned completed{ 0 }; + std::chrono::steady_clock::time_point timestamp{}; + size_t processed{ 0 }; // Total number of accounts processed + }; + + // clang-format off + class tag_sequenced {}; + class tag_start {}; + class tag_timestamp {}; + + using ordered_heads = boost::multi_index_container>, + mi::ordered_unique, + mi::member>, + mi::ordered_non_unique, + mi::member> + >>; + // clang-format on + + ordered_heads heads; +}; +} \ No newline at end of file diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index 450767f03..1de0f696c 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -29,9 +29,12 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf logger{ logger_a }, accounts{ config.account_sets, stats }, database_scan{ ledger }, + frontiers{ config.frontier_scan, stats }, throttle{ compute_throttle_size () }, scoring{ config, node_config_a.network_params.network }, - database_limiter{ config.database_rate_limit, 1.0 } + database_limiter{ config.database_rate_limit }, + frontiers_limiter{ config.frontier_rate_limit }, + workers{ 1, nano::thread_role::name::ascending_bootstrap_worker } { // TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread block_processor.batch_processed.add ([this] (auto const & batch) { @@ -57,6 +60,7 @@ nano::bootstrap_ascending::service::~service () debug_assert (!priorities_thread.joinable ()); debug_assert (!database_thread.joinable ()); debug_assert (!dependencies_thread.joinable ()); + debug_assert (!frontiers_thread.joinable ()); debug_assert (!timeout_thread.joinable ()); } @@ -65,6 +69,7 @@ void nano::bootstrap_ascending::service::start () debug_assert (!priorities_thread.joinable ()); debug_assert (!database_thread.joinable ()); debug_assert (!dependencies_thread.joinable ()); + debug_assert (!frontiers_thread.joinable ()); debug_assert (!timeout_thread.joinable ()); if (!config.enable) @@ -94,6 +99,14 @@ void nano::bootstrap_ascending::service::start () }); } + if (config.enable_frontier_scan) + { + frontiers_thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); + run_frontiers (); + }); + } + timeout_thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::ascending_bootstrap); run_timeouts (); @@ -111,10 +124,11 @@ void nano::bootstrap_ascending::service::stop () nano::join_or_pass (priorities_thread); nano::join_or_pass (database_thread); nano::join_or_pass (dependencies_thread); + nano::join_or_pass (frontiers_thread); nano::join_or_pass (timeout_thread); } -void nano::bootstrap_ascending::service::send (std::shared_ptr const & channel, async_tag tag) +bool nano::bootstrap_ascending::service::send (std::shared_ptr const & channel, async_tag tag) { debug_assert (tag.type != query_type::invalid); debug_assert (tag.source != query_source::invalid); @@ -125,6 +139,8 @@ void nano::bootstrap_ascending::service::send (std::shared_ptr ().insert (tag); } + on_request.notify (tag, channel); + nano::asc_pull_req request{ network_constants }; request.id = tag.id; @@ -152,6 +168,16 @@ void nano::bootstrap_ascending::service::send (std::shared_ptrsend ( request, nullptr, nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); + + return true; // TODO: Return channel send result } std::size_t nano::bootstrap_ascending::service::priority_size () const @@ -409,6 +437,24 @@ nano::block_hash nano::bootstrap_ascending::service::wait_blocking () return result; } +nano::account nano::bootstrap_ascending::service::wait_frontier () +{ + nano::account result{ 0 }; + + wait ([this, &result] () { + debug_assert (!mutex.try_lock ()); + result = frontiers.next (); + if (!result.is_zero ()) + { + stats.inc (nano::stat::type::bootstrap_ascending_next, nano::stat::detail::next_frontier); + return true; + } + return false; + }); + + return result; +} + bool nano::bootstrap_ascending::service::request (nano::account account, size_t count, std::shared_ptr const & channel, query_source source) { debug_assert (count > 0); @@ -436,11 +482,7 @@ bool nano::bootstrap_ascending::service::request (nano::account account, size_t tag.start = account; } - on_request.notify (tag, channel); - - send (channel, tag); - - return true; // Request sent + return send (channel, tag); } bool nano::bootstrap_ascending::service::request_info (nano::block_hash hash, std::shared_ptr const & channel, query_source source) @@ -451,11 +493,17 @@ bool nano::bootstrap_ascending::service::request_info (nano::block_hash hash, st tag.start = hash; tag.hash = hash; - on_request.notify (tag, channel); + return send (channel, tag); +} - send (channel, tag); +bool nano::bootstrap_ascending::service::request_frontiers (nano::account start, std::shared_ptr const & channel, query_source source) +{ + async_tag tag{}; + tag.type = query_type::frontiers; + tag.source = source; + tag.start = start; - return true; // Request sent + return send (channel, tag); } void nano::bootstrap_ascending::service::run_one_priority () @@ -549,6 +597,103 @@ void nano::bootstrap_ascending::service::run_dependencies () } } +void nano::bootstrap_ascending::service::run_one_frontier () +{ + wait ([this] () { + return !accounts.priority_half_full (); + }); + wait ([this] () { + return frontiers_limiter.should_pass (1); + }); + wait ([this] () { + return workers.num_queued_tasks () < config.frontier_scan.max_pending; + }); + wait_tags (); + auto channel = wait_channel (); + if (!channel) + { + return; + } + auto frontier = wait_frontier (); + if (frontier.is_zero ()) + { + return; + } + request_frontiers (frontier, channel, query_source::frontiers); +} + +void nano::bootstrap_ascending::service::run_frontiers () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + lock.unlock (); + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_frontiers); + run_one_frontier (); + lock.lock (); + } +} + +void nano::bootstrap_ascending::service::process_frontiers (std::deque> const & frontiers) +{ + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::process_frontiers); + + size_t outdated = 0; + size_t pending = 0; + + // Accounts with outdated frontiers to sync + std::deque result; + { + auto transaction = ledger.tx_begin_read (); + + auto should_prioritize = [&] (nano::account const & account, nano::block_hash const & frontier) { + if (ledger.any.block_exists_or_pruned (transaction, frontier)) + { + return false; + } + if (auto info = ledger.any.account_get (transaction, account)) + { + if (info->head != frontier) + { + outdated++; + return true; // Frontier is outdated + } + return false; + } + if (auto receivable = ledger.any.receivable_lower_bound (transaction, account, { 0 })) + { + if (receivable->first.account == account) + { + pending++; + return true; // Account doesn't exist but has pending blocks in the ledger + } + return false; + } + return false; + }; + + for (auto const & [account, frontier] : frontiers) + { + if (should_prioritize (account, frontier)) + { + result.push_back (account); + } + } + } + + stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers_processed, frontiers.size ()); + stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers_prioritized, result.size ()); + stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers_outdated, outdated); + stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers_pending, pending); + + nano::lock_guard guard{ mutex }; + + for (auto const & account : result) + { + accounts.priority_set (account); + } +} + void nano::bootstrap_ascending::service::cleanup_and_sync () { debug_assert (!mutex.try_lock ()); @@ -622,7 +767,7 @@ void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & mes } bool operator() (const nano::asc_pull_ack::frontiers_payload & response) const { - return false; // TODO: Handle frontiers + return type == query_type::frontiers; } bool operator() (const nano::empty_payload & response) const { @@ -664,7 +809,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc { case verify_result::ok: { - stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::ok); + stats.inc (nano::stat::type::bootstrap_ascending_verify_blocks, nano::stat::detail::ok); stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ()); auto blocks = response.blocks; @@ -705,7 +850,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc break; case verify_result::nothing_new: { - stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::nothing_new); + stats.inc (nano::stat::type::bootstrap_ascending_verify_blocks, nano::stat::detail::nothing_new); nano::lock_guard lock{ mutex }; accounts.priority_down (tag.account); @@ -717,7 +862,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc break; case verify_result::invalid: { - stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::invalid); + stats.inc (nano::stat::type::bootstrap_ascending_verify_blocks, nano::stat::detail::invalid); } break; } @@ -731,24 +876,69 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::acco if (response.account.is_zero ()) { stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info_empty); + return; } - else - { - stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info); - // Prioritize account containing the dependency - { - nano::lock_guard lock{ mutex }; - accounts.dependency_update (tag.hash, response.account); - accounts.priority_set (response.account); - } + stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info); + + // Prioritize account containing the dependency + { + nano::lock_guard lock{ mutex }; + accounts.dependency_update (tag.hash, response.account); + accounts.priority_set (response.account); } } void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::frontiers_payload & response, const async_tag & tag) { - // TODO: Make use of frontiers info + debug_assert (tag.type == query_type::frontiers); + debug_assert (!tag.start.is_zero ()); + + if (response.frontiers.empty ()) + { + stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::frontiers_empty); + return; + } + stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::frontiers); + + auto result = verify (response, tag); + switch (result) + { + case verify_result::ok: + { + stats.inc (nano::stat::type::bootstrap_ascending_verify_frontiers, nano::stat::detail::ok); + stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers, nano::stat::dir::in, response.frontiers.size ()); + + { + nano::lock_guard lock{ mutex }; + frontiers.process (tag.start.as_account (), response.frontiers); + } + + // Allow some overfill to avoid unnecessarily dropping responses + if (workers.num_queued_tasks () < config.frontier_scan.max_pending * 4) + { + workers.push_task ([this, frontiers = response.frontiers] { + process_frontiers (frontiers); + }); + } + else + { + stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::dropped_frontiers); + } + } + break; + case verify_result::nothing_new: + { + stats.inc (nano::stat::type::bootstrap_ascending_verify_frontiers, nano::stat::detail::nothing_new); + } + break; + case verify_result::invalid: + { + stats.inc (nano::stat::type::bootstrap_ascending_verify_frontiers, nano::stat::detail::invalid); + } + break; + } } void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const async_tag & tag) @@ -816,6 +1006,35 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser return verify_result::ok; } +nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::service::verify (nano::asc_pull_ack::frontiers_payload const & response, async_tag const & tag) const +{ + auto const & frontiers = response.frontiers; + + if (frontiers.empty ()) + { + return verify_result::nothing_new; + } + + // Ensure frontiers accounts are in ascending order + nano::account previous{ 0 }; + for (auto const & [account, _] : frontiers) + { + if (account.number () <= previous.number ()) + { + return verify_result::invalid; + } + previous = account; + } + + // Ensure the frontiers are larger or equal to the requested frontier + if (frontiers.front ().first.number () < tag.start.as_account ().number ()) + { + return verify_result::invalid; + } + + return verify_result::ok; +} + auto nano::bootstrap_ascending::service::info () const -> nano::bootstrap_ascending::account_sets::info_t { nano::lock_guard lock{ mutex }; @@ -840,6 +1059,8 @@ std::unique_ptr nano::bootstrap_ascending::servi composite->add_component (std::make_unique (container_info{ "throttle_successes", throttle.successes (), 0 })); composite->add_component (accounts.collect_container_info ("accounts")); composite->add_component (database_scan.collect_container_info ("database_scan")); + composite->add_component (frontiers.collect_container_info ("frontiers")); + composite->add_component (workers.collect_container_info ("workers")); return composite; } diff --git a/nano/node/bootstrap_ascending/service.hpp b/nano/node/bootstrap_ascending/service.hpp index 4f3f3668d..652c7b862 100644 --- a/nano/node/bootstrap_ascending/service.hpp +++ b/nano/node/bootstrap_ascending/service.hpp @@ -5,13 +5,15 @@ #include #include #include +#include +#include #include #include #include #include +#include #include #include -#include #include #include @@ -63,6 +65,7 @@ namespace bootstrap_ascending blocks_by_hash, blocks_by_account, account_info_by_hash, + frontiers, }; enum class query_source @@ -71,6 +74,7 @@ namespace bootstrap_ascending priority, database, blocking, + frontiers, }; struct async_tag @@ -101,8 +105,11 @@ namespace bootstrap_ascending void run_one_database (bool should_throttle); void run_dependencies (); void run_one_blocking (); + void run_one_frontier (); + void run_frontiers (); void run_timeouts (); void cleanup_and_sync (); + void process_frontiers (std::deque> const & frontiers); /* Waits for a condition to be satisfied with incremental backoff */ void wait (std::function const & predicate) const; @@ -122,10 +129,13 @@ namespace bootstrap_ascending /* Waits for next available blocking block */ nano::block_hash next_blocking (); nano::block_hash wait_blocking (); + /* Waits for next available frontier scan range */ + nano::account wait_frontier (); bool request (nano::account, size_t count, std::shared_ptr const &, query_source); bool request_info (nano::block_hash, std::shared_ptr const &, query_source); - void send (std::shared_ptr const &, async_tag tag); + bool request_frontiers (nano::account, std::shared_ptr const &, query_source); + bool send (std::shared_ptr const &, async_tag tag); void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag); void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag); @@ -146,6 +156,7 @@ namespace bootstrap_ascending * - ok: otherwise, if all checks pass */ verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const; + verify_result verify (nano::asc_pull_ack::frontiers_payload const & response, async_tag const & tag) const; size_t count_tags (nano::account const & account, query_source source) const; size_t count_tags (nano::block_hash const & hash, query_source source) const; @@ -158,6 +169,7 @@ namespace bootstrap_ascending nano::bootstrap_ascending::database_scan database_scan; nano::bootstrap_ascending::throttle throttle; nano::bootstrap_ascending::peer_scoring scoring; + nano::bootstrap_ascending::frontier_scan frontiers; // clang-format off class tag_sequenced {}; @@ -181,6 +193,7 @@ namespace bootstrap_ascending // Requests for accounts from database have much lower hitrate and could introduce strain on the network // A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue nano::rate_limiter database_limiter; + nano::rate_limiter frontiers_limiter; nano::interval sync_dependencies_interval; @@ -190,7 +203,10 @@ namespace bootstrap_ascending std::thread priorities_thread; std::thread database_thread; std::thread dependencies_thread; + std::thread frontiers_thread; std::thread timeout_thread; + + nano::thread_pool workers; }; nano::stat::detail to_stat_detail (service::query_type); diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index 00f1070b0..cba54744d 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -724,7 +724,7 @@ public: // Payload definitions static frontier deserialize_frontier (nano::stream &); public: // Payload - std::vector frontiers; + std::deque frontiers; public: // Logging void operator() (nano::object_stream &) const;