Merge pull request #4739 from pwojcikdev/frontier-scan-5

Frontier scan client
This commit is contained in:
Piotr Wójcik 2024-10-31 11:32:03 +01:00 committed by GitHub
commit e72a999e4c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 1144 additions and 333 deletions

View file

@ -436,8 +436,8 @@ TEST (inactive_votes_cache, election_start)
nano::test::system system;
nano::node_config node_config = system.default_config ();
node_config.backlog_population.enable = false;
node_config.priority_scheduler.enabled = false;
node_config.optimistic_scheduler.enabled = false;
node_config.priority_scheduler.enable = false;
node_config.optimistic_scheduler.enable = false;
auto & node = *system.add_node (node_config);
nano::block_hash latest (node.latest (nano::dev::genesis_key.pub));
nano::keypair key1, key2;
@ -1336,7 +1336,7 @@ TEST (active_elections, limit_vote_hinted_elections)
nano::node_config config = system.default_config ();
const int aec_limit = 10;
config.backlog_population.enable = false;
config.optimistic_scheduler.enabled = false;
config.optimistic_scheduler.enable = false;
config.active_elections.size = aec_limit;
config.active_elections.hinted_limit_percentage = 10; // Should give us a limit of 1 hinted election
auto & node = *system.add_node (config);

View file

@ -2,12 +2,11 @@
#include <nano/lib/logging.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/tomlconfig.hpp>
#include <nano/node/bootstrap_ascending/database_scan.hpp>
#include <nano/node/bootstrap_ascending/service.hpp>
#include <nano/node/make_store.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
#include <nano/test_common/ledger_context.hpp>
#include <nano/test_common/chains.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
@ -269,218 +268,290 @@ TEST (bootstrap_ascending, trace_base)
ASSERT_TIMELY (10s, node1.block (receive1->hash ()) != nullptr);
}
TEST (bootstrap_ascending, pending_database_scanner)
/*
* Tests that bootstrap will prioritize existing accounts with outdated frontiers
*/
TEST (bootstrap_ascending, frontier_scan)
{
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::test::system system;
// Prepare pending sends from genesis
// 1 account with 1 pending
// 1 account with 21 pendings
// 2 accounts with 1 pending each
std::deque<std::shared_ptr<nano::block>> blocks;
nano::keypair key1, key2, key3, key4;
nano::node_flags flags;
flags.disable_legacy_bootstrap = true;
nano::node_config config;
// Disable other bootstrap strategies
config.bootstrap_ascending.enable_scan = false;
config.bootstrap_ascending.enable_dependency_walker = false;
// Disable election activation
config.backlog_population.enable = false;
config.priority_scheduler.enable = false;
config.optimistic_scheduler.enable = false;
config.hinted_scheduler.enable = false;
// Prepare blocks for frontier scan (genesis 10 sends -> 10 opens -> 10 updates)
std::vector<std::shared_ptr<nano::block>> sends;
std::vector<std::shared_ptr<nano::block>> opens;
std::vector<std::shared_ptr<nano::block>> updates;
{
nano::state_block_builder builder;
auto source = nano::dev::genesis_key;
auto latest = nano::dev::genesis->hash ();
auto balance = nano::dev::genesis->balance ().number ();
// 1 account with 1 pending
{
auto send = builder.make_block ()
.account (source.pub)
.previous (latest)
.representative (source.pub)
.link (key1.pub)
.balance (balance - 1)
.sign (source.prv, source.pub)
.work (*pool.generate (latest))
.build ();
latest = send->hash ();
balance = send->balance_field ().value ().number ();
blocks.push_back (send);
}
// 1 account with 21 pendings
for (int i = 0; i < 21; ++i)
{
auto send = builder.make_block ()
.account (source.pub)
.previous (latest)
.representative (source.pub)
.link (key2.pub)
.balance (balance - 1)
.sign (source.prv, source.pub)
.work (*pool.generate (latest))
.build ();
latest = send->hash ();
balance = send->balance_field ().value ().number ();
blocks.push_back (send);
}
// 2 accounts with 1 pending each
{
auto send = builder.make_block ()
.account (source.pub)
.previous (latest)
.representative (source.pub)
.link (key3.pub)
.balance (balance - 1)
.sign (source.prv, source.pub)
.work (*pool.generate (latest))
.build ();
latest = send->hash ();
balance = send->balance_field ().value ().number ();
blocks.push_back (send);
}
{
auto send = builder.make_block ()
.account (source.pub)
.previous (latest)
.representative (source.pub)
.link (key4.pub)
.balance (balance - 1)
.sign (source.prv, source.pub)
.work (*pool.generate (latest))
.build ();
latest = send->hash ();
balance = send->balance_field ().value ().number ();
blocks.push_back (send);
}
}
size_t const count = 10;
nano::test::ledger_context ctx{ std::move (blocks) };
// Single batch
{
nano::bootstrap_ascending::pending_database_iterator scanner{ ctx.ledger () };
auto transaction = ctx.store ().tx_begin_read ();
auto accounts = scanner.next_batch (transaction, 256);
// Check that account set contains all keys
ASSERT_EQ (accounts.size (), 4);
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key1.pub) != accounts.end ());
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key2.pub) != accounts.end ());
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key3.pub) != accounts.end ());
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key4.pub) != accounts.end ());
ASSERT_EQ (scanner.completed, 1);
}
// Multi batch
{
nano::bootstrap_ascending::pending_database_iterator scanner{ ctx.ledger () };
auto transaction = ctx.store ().tx_begin_read ();
// Request accounts in multiple batches
auto accounts1 = scanner.next_batch (transaction, 2);
auto accounts2 = scanner.next_batch (transaction, 1);
auto accounts3 = scanner.next_batch (transaction, 1);
ASSERT_EQ (accounts1.size (), 2);
ASSERT_EQ (accounts2.size (), 1);
ASSERT_EQ (accounts3.size (), 1);
std::deque<nano::account> accounts;
accounts.insert (accounts.end (), accounts1.begin (), accounts1.end ());
accounts.insert (accounts.end (), accounts2.begin (), accounts2.end ());
accounts.insert (accounts.end (), accounts3.begin (), accounts3.end ());
// Check that account set contains all keys
ASSERT_EQ (accounts.size (), 4);
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key1.pub) != accounts.end ());
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key2.pub) != accounts.end ());
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key3.pub) != accounts.end ());
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key4.pub) != accounts.end ());
ASSERT_EQ (scanner.completed, 1);
}
}
TEST (bootstrap_ascending, account_database_scanner)
{
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
size_t const count = 4;
// Prepare some accounts
std::deque<std::shared_ptr<nano::block>> blocks;
std::deque<nano::keypair> keys;
{
nano::state_block_builder builder;
auto source = nano::dev::genesis_key;
auto latest = nano::dev::genesis->hash ();
auto balance = nano::dev::genesis->balance ().number ();
for (int i = 0; i < count; ++i)
for (int n = 0; n < count; ++n)
{
nano::keypair key;
auto send = builder.make_block ()
nano::block_builder builder;
balance -= 1;
auto send = builder
.state ()
.account (source.pub)
.previous (latest)
.representative (source.pub)
.balance (balance)
.link (key.pub)
.balance (balance - 1)
.sign (source.prv, source.pub)
.work (*pool.generate (latest))
.work (*system.work.generate (latest))
.build ();
auto open = builder.make_block ()
latest = send->hash ();
auto open = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.link (send->hash ())
.balance (1)
.link (send->hash ())
.sign (key.prv, key.pub)
.work (*pool.generate (key.pub))
.work (*system.work.generate (key.pub))
.build ();
auto update = builder
.state ()
.account (key.pub)
.previous (open->hash ())
.representative (0)
.balance (1)
.link (0)
.sign (key.prv, key.pub)
.work (*system.work.generate (open->hash ()))
.build ();
sends.push_back (send);
opens.push_back (open);
updates.push_back (update);
}
}
// Initialize nodes with blocks without the `updates` frontiers
std::vector<std::shared_ptr<nano::block>> blocks;
blocks.insert (blocks.end (), sends.begin (), sends.end ());
blocks.insert (blocks.end (), opens.begin (), opens.end ());
system.set_initialization_blocks ({ blocks.begin (), blocks.end () });
auto & node0 = *system.add_node (config, flags);
ASSERT_TRUE (nano::test::process (node0, updates));
// No blocks should be broadcast to the other node
auto & node1 = *system.add_node (config, flags);
ASSERT_ALWAYS_EQ (100ms, node1.ledger.block_count (), blocks.size () + 1);
// Frontier scan should detect all the accounts with missing blocks
ASSERT_TIMELY (10s, std::all_of (updates.begin (), updates.end (), [&node1] (auto const & block) {
return node1.ascendboot.prioritized (block->account ());
}));
}
/*
* Tests that bootstrap will prioritize not yet existing accounts with pending blocks
*/
TEST (bootstrap_ascending, frontier_scan_pending)
{
nano::test::system system;
nano::node_flags flags;
flags.disable_legacy_bootstrap = true;
nano::node_config config;
// Disable other bootstrap strategies
config.bootstrap_ascending.enable_scan = false;
config.bootstrap_ascending.enable_dependency_walker = false;
// Disable election activation
config.backlog_population.enable = false;
config.priority_scheduler.enable = false;
config.optimistic_scheduler.enable = false;
config.hinted_scheduler.enable = false;
// Prepare blocks for frontier scan (genesis 10 sends -> 10 opens)
std::vector<std::shared_ptr<nano::block>> sends;
std::vector<std::shared_ptr<nano::block>> opens;
{
auto source = nano::dev::genesis_key;
auto latest = nano::dev::genesis->hash ();
auto balance = nano::dev::genesis->balance ().number ();
size_t const count = 10;
for (int n = 0; n < count; ++n)
{
nano::keypair key;
nano::block_builder builder;
balance -= 1;
auto send = builder
.state ()
.account (source.pub)
.previous (latest)
.representative (source.pub)
.balance (balance)
.link (key.pub)
.sign (source.prv, source.pub)
.work (*system.work.generate (latest))
.build ();
latest = send->hash ();
balance = send->balance_field ().value ().number ();
blocks.push_back (send);
blocks.push_back (open);
keys.push_back (key);
auto open = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (1)
.link (send->hash ())
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build ();
sends.push_back (send);
opens.push_back (open);
}
}
nano::test::ledger_context ctx{ std::move (blocks) };
// Initialize nodes with blocks without the `updates` frontiers
std::vector<std::shared_ptr<nano::block>> blocks;
blocks.insert (blocks.end (), sends.begin (), sends.end ());
system.set_initialization_blocks ({ blocks.begin (), blocks.end () });
// Single batch
auto & node0 = *system.add_node (config, flags);
ASSERT_TRUE (nano::test::process (node0, opens));
// No blocks should be broadcast to the other node
auto & node1 = *system.add_node (config, flags);
ASSERT_ALWAYS_EQ (100ms, node1.ledger.block_count (), blocks.size () + 1);
// Frontier scan should detect all the accounts with missing blocks
ASSERT_TIMELY (10s, std::all_of (opens.begin (), opens.end (), [&node1] (auto const & block) {
return node1.ascendboot.prioritized (block->account ());
}));
}
/*
* Bootstrap should not attempt to prioritize accounts that can't be immediately connected to the ledger (no pending blocks, no existing frontier)
*/
TEST (bootstrap_ascending, frontier_scan_cannot_prioritize)
{
nano::test::system system;
nano::node_flags flags;
flags.disable_legacy_bootstrap = true;
nano::node_config config;
// Disable other bootstrap strategies
config.bootstrap_ascending.enable_scan = false;
config.bootstrap_ascending.enable_dependency_walker = false;
// Disable election activation
config.backlog_population.enable = false;
config.priority_scheduler.enable = false;
config.optimistic_scheduler.enable = false;
config.hinted_scheduler.enable = false;
// Prepare blocks for frontier scan (genesis 10 sends -> 10 opens -> 10 sends -> 10 opens)
std::vector<std::shared_ptr<nano::block>> sends;
std::vector<std::shared_ptr<nano::block>> opens;
std::vector<std::shared_ptr<nano::block>> sends2;
std::vector<std::shared_ptr<nano::block>> opens2;
{
nano::bootstrap_ascending::account_database_iterator scanner{ ctx.ledger () };
auto transaction = ctx.store ().tx_begin_read ();
auto accounts = scanner.next_batch (transaction, 256);
auto source = nano::dev::genesis_key;
auto latest = nano::dev::genesis->hash ();
auto balance = nano::dev::genesis->balance ().number ();
// Check that account set contains all keys
ASSERT_EQ (accounts.size (), keys.size () + 1); // +1 for genesis
for (auto const & key : keys)
size_t const count = 10;
for (int n = 0; n < count; ++n)
{
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key.pub) != accounts.end ());
nano::keypair key, key2;
nano::block_builder builder;
balance -= 1;
auto send = builder
.state ()
.account (source.pub)
.previous (latest)
.representative (source.pub)
.balance (balance)
.link (key.pub)
.sign (source.prv, source.pub)
.work (*system.work.generate (latest))
.build ();
latest = send->hash ();
auto open = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (1)
.link (send->hash ())
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build ();
auto send2 = builder
.state ()
.account (key.pub)
.previous (open->hash ())
.representative (key.pub)
.balance (0)
.link (key2.pub)
.sign (key.prv, key.pub)
.work (*system.work.generate (open->hash ()))
.build ();
auto open2 = builder
.state ()
.account (key2.pub)
.previous (0)
.representative (key2.pub)
.balance (1)
.link (send2->hash ())
.sign (key2.prv, key2.pub)
.work (*system.work.generate (key2.pub))
.build ();
sends.push_back (send);
opens.push_back (open);
sends2.push_back (send2);
opens2.push_back (open2);
}
ASSERT_EQ (scanner.completed, 1);
}
// Multi batch
{
nano::bootstrap_ascending::account_database_iterator scanner{ ctx.ledger () };
auto transaction = ctx.store ().tx_begin_read ();
// Request accounts in multiple batches
auto accounts1 = scanner.next_batch (transaction, 2);
auto accounts2 = scanner.next_batch (transaction, 2);
auto accounts3 = scanner.next_batch (transaction, 1);
// Initialize nodes with blocks without the `updates` frontiers
std::vector<std::shared_ptr<nano::block>> blocks;
blocks.insert (blocks.end (), sends.begin (), sends.end ());
blocks.insert (blocks.end (), opens.begin (), opens.end ());
system.set_initialization_blocks ({ blocks.begin (), blocks.end () });
ASSERT_EQ (accounts1.size (), 2);
ASSERT_EQ (accounts2.size (), 2);
ASSERT_EQ (accounts3.size (), 1);
auto & node0 = *system.add_node (config, flags);
ASSERT_TRUE (nano::test::process (node0, sends2));
ASSERT_TRUE (nano::test::process (node0, opens2));
std::deque<nano::account> accounts;
accounts.insert (accounts.end (), accounts1.begin (), accounts1.end ());
accounts.insert (accounts.end (), accounts2.begin (), accounts2.end ());
accounts.insert (accounts.end (), accounts3.begin (), accounts3.end ());
// No blocks should be broadcast to the other node
auto & node1 = *system.add_node (config, flags);
ASSERT_ALWAYS_EQ (100ms, node1.ledger.block_count (), blocks.size () + 1);
// Check that account set contains all keys
ASSERT_EQ (accounts.size (), keys.size () + 1); // +1 for genesis
for (auto const & key : keys)
{
ASSERT_TRUE (std::find (accounts.begin (), accounts.end (), key.pub) != accounts.end ());
}
ASSERT_EQ (scanner.completed, 1);
}
// Frontier scan should not detect the accounts
ASSERT_ALWAYS (1s, std::none_of (opens2.begin (), opens2.end (), [&node1] (auto const & block) {
return node1.ascendboot.prioritized (block->account ());
}));
}

View file

@ -3762,9 +3762,9 @@ TEST (node, local_block_broadcast)
// Disable active elections to prevent the block from being broadcasted by the election
auto node_config = system.default_config ();
node_config.priority_scheduler.enabled = false;
node_config.hinted_scheduler.enabled = false;
node_config.optimistic_scheduler.enabled = false;
node_config.priority_scheduler.enable = false;
node_config.hinted_scheduler.enable = false;
node_config.optimistic_scheduler.enable = false;
node_config.local_block_broadcaster.rebroadcast_interval = 1s;
auto & node1 = *system.add_node (node_config);
auto & node2 = *system.make_disconnected_node ();

View file

@ -244,11 +244,11 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.rocksdb_config.read_cache, defaults.node.rocksdb_config.read_cache);
ASSERT_EQ (conf.node.rocksdb_config.write_cache, defaults.node.rocksdb_config.write_cache);
ASSERT_EQ (conf.node.optimistic_scheduler.enabled, defaults.node.optimistic_scheduler.enabled);
ASSERT_EQ (conf.node.optimistic_scheduler.enable, defaults.node.optimistic_scheduler.enable);
ASSERT_EQ (conf.node.optimistic_scheduler.gap_threshold, defaults.node.optimistic_scheduler.gap_threshold);
ASSERT_EQ (conf.node.optimistic_scheduler.max_size, defaults.node.optimistic_scheduler.max_size);
ASSERT_EQ (conf.node.hinted_scheduler.enabled, defaults.node.hinted_scheduler.enabled);
ASSERT_EQ (conf.node.hinted_scheduler.enable, defaults.node.hinted_scheduler.enable);
ASSERT_EQ (conf.node.hinted_scheduler.hinting_threshold_percent, defaults.node.hinted_scheduler.hinting_threshold_percent);
ASSERT_EQ (conf.node.hinted_scheduler.check_interval.count (), defaults.node.hinted_scheduler.check_interval.count ());
ASSERT_EQ (conf.node.hinted_scheduler.block_cooldown.count (), defaults.node.hinted_scheduler.block_cooldown.count ());
@ -599,7 +599,8 @@ TEST (toml, daemon_config_deserialize_no_defaults)
[node.bootstrap_ascending]
enable = false
enable_database_scan = false
enable_frontier_scan = false
enable_database_scan = true
enable_dependency_walker = false
channel_limit = 999
database_rate_limit = 999
@ -751,11 +752,11 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.rocksdb_config.read_cache, defaults.node.rocksdb_config.read_cache);
ASSERT_NE (conf.node.rocksdb_config.write_cache, defaults.node.rocksdb_config.write_cache);
ASSERT_NE (conf.node.optimistic_scheduler.enabled, defaults.node.optimistic_scheduler.enabled);
ASSERT_NE (conf.node.optimistic_scheduler.enable, defaults.node.optimistic_scheduler.enable);
ASSERT_NE (conf.node.optimistic_scheduler.gap_threshold, defaults.node.optimistic_scheduler.gap_threshold);
ASSERT_NE (conf.node.optimistic_scheduler.max_size, defaults.node.optimistic_scheduler.max_size);
ASSERT_NE (conf.node.hinted_scheduler.enabled, defaults.node.hinted_scheduler.enabled);
ASSERT_NE (conf.node.hinted_scheduler.enable, defaults.node.hinted_scheduler.enable);
ASSERT_NE (conf.node.hinted_scheduler.hinting_threshold_percent, defaults.node.hinted_scheduler.hinting_threshold_percent);
ASSERT_NE (conf.node.hinted_scheduler.check_interval.count (), defaults.node.hinted_scheduler.check_interval.count ());
ASSERT_NE (conf.node.hinted_scheduler.block_cooldown.count (), defaults.node.hinted_scheduler.block_cooldown.count ());
@ -777,6 +778,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size);
ASSERT_NE (conf.node.bootstrap_ascending.enable, defaults.node.bootstrap_ascending.enable);
ASSERT_NE (conf.node.bootstrap_ascending.enable_frontier_scan, defaults.node.bootstrap_ascending.enable_frontier_scan);
ASSERT_NE (conf.node.bootstrap_ascending.enable_database_scan, defaults.node.bootstrap_ascending.enable_database_scan);
ASSERT_NE (conf.node.bootstrap_ascending.enable_dependency_walker, defaults.node.bootstrap_ascending.enable_dependency_walker);
ASSERT_NE (conf.node.bootstrap_ascending.channel_limit, defaults.node.bootstrap_ascending.channel_limit);

View file

@ -22,8 +22,8 @@ TEST (vote_processor, codes)
auto node_config = system.default_config ();
// Disable all election schedulers
node_config.backlog_population.enable = false;
node_config.hinted_scheduler.enabled = false;
node_config.optimistic_scheduler.enabled = false;
node_config.hinted_scheduler.enable = false;
node_config.optimistic_scheduler.enable = false;
auto & node = *system.add_node (node_config);
auto blocks = nano::test::setup_chain (system, node, 1, nano::dev::genesis_key, false);

View file

@ -66,10 +66,10 @@ class rate_limiter final
{
public:
// initialize with limit 0 = unbounded
rate_limiter (std::size_t limit, double burst_ratio);
rate_limiter (std::size_t limit, double burst_ratio = 1.0);
bool should_pass (std::size_t buffer_size);
void reset (std::size_t limit, double burst_ratio);
void reset (std::size_t limit, double burst_ratio = 1.0);
private:
nano::rate::token_bucket bucket;

View file

@ -60,11 +60,13 @@ enum class type
blockprocessor_overfill,
bootstrap_ascending,
bootstrap_ascending_accounts,
bootstrap_ascending_verify,
bootstrap_ascending_verify_blocks,
bootstrap_ascending_verify_frontiers,
bootstrap_ascending_process,
bootstrap_ascending_request,
bootstrap_ascending_reply,
bootstrap_ascending_next,
bootstrap_ascending_frontiers,
bootstrap_server,
bootstrap_server_request,
bootstrap_server_overfill,
@ -118,6 +120,7 @@ enum class detail
inserted,
erased,
request,
request_failed,
broadcast,
cleanup,
top,
@ -137,6 +140,8 @@ enum class detail
empty,
done,
retry,
prioritized,
pending,
// processing queue
queue,
@ -430,7 +435,7 @@ enum class detail
missing_cookie,
invalid_genesis,
// bootstrap ascending
// bootstrap_ascending
missing_tag,
reply,
throttled,
@ -438,13 +443,18 @@ enum class detail
timeout,
nothing_new,
account_info_empty,
frontiers_empty,
loop_database,
loop_dependencies,
loop_frontiers,
loop_frontiers_processing,
duplicate_request,
invalid_response_type,
timestamp_reset,
process_frontiers,
dropped_frontiers,
// bootstrap ascending accounts
// bootstrap_ascending_accounts
prioritize,
prioritize_failed,
block,
@ -453,11 +463,20 @@ enum class detail
dependency_update,
dependency_update_failed,
// bootstrap_ascending_frontiers
done_range,
done_empty,
next_by_requests,
next_by_timestamp,
advance,
advance_failed,
next_none,
next_priority,
next_database,
next_blocking,
next_dependency,
next_frontier,
blocking_insert,
blocking_erase_overflow,
@ -475,6 +494,7 @@ enum class detail
// active
started_hinted,
started_optimistic,
// rep_crawler
channel_dead,
query_target_failed,

View file

@ -104,7 +104,10 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
thread_role_name_string = "Voting que";
break;
case nano::thread_role::name::ascending_bootstrap:
thread_role_name_string = "Bootstrap asc";
thread_role_name_string = "Ascboot";
break;
case nano::thread_role::name::ascending_bootstrap_worker:
thread_role_name_string = "Ascboot work";
break;
case nano::thread_role::name::bootstrap_server:
thread_role_name_string = "Bootstrap serv";

View file

@ -42,6 +42,7 @@ enum class name
bootstrap_server,
telemetry,
ascending_bootstrap,
ascending_bootstrap_worker,
bootstrap_server_requests,
bootstrap_server_responses,
scheduler_hinted,

View file

@ -46,13 +46,15 @@ add_library(
bootstrap/bootstrap.cpp
bootstrap/bootstrap_server.hpp
bootstrap/bootstrap_server.cpp
bootstrap_ascending/common.hpp
bootstrap_ascending/throttle.hpp
bootstrap_ascending/throttle.cpp
bootstrap_ascending/account_sets.hpp
bootstrap_ascending/account_sets.cpp
bootstrap_ascending/database_scan.hpp
bootstrap_ascending/database_scan.cpp
bootstrap_ascending/common.hpp
bootstrap_ascending/frontier_scan.hpp
bootstrap_ascending/frontier_scan.cpp
bootstrap_ascending/throttle.hpp
bootstrap_ascending/throttle.cpp
bootstrap_ascending/peer_scoring.hpp
bootstrap_ascending/peer_scoring.cpp
bootstrap_ascending/service.hpp

View file

@ -34,6 +34,7 @@ nano::error nano::bootstrap_ascending_config::deserialize (nano::tomlconfig & to
toml.get ("enable", enable);
toml.get ("enable_database_scan", enable_database_scan);
toml.get ("enable_dependency_walker", enable_dependency_walker);
toml.get ("enable_frontier_scan", enable_frontier_scan);
toml.get ("channel_limit", channel_limit);
toml.get ("database_rate_limit", database_rate_limit);
@ -59,6 +60,7 @@ nano::error nano::bootstrap_ascending_config::serialize (nano::tomlconfig & toml
toml.put ("enable", enable, "Enable or disable the ascending bootstrap. Disabling it is not recommended and will prevent the node from syncing.\ntype:bool");
toml.put ("enable_database_scan", enable_database_scan, "Enable or disable the 'database scan` strategy for the ascending bootstrap.\ntype:bool");
toml.put ("enable_dependency_walker", enable_dependency_walker, "Enable or disable the 'dependency walker` strategy for the ascending bootstrap.\ntype:bool");
toml.put ("enable_frontier_scan", enable_frontier_scan, "Enable or disable the 'frontier scan` strategy for the ascending bootstrap.\ntype:bool");
toml.put ("channel_limit", channel_limit, "Maximum number of un-responded requests per channel.\nNote: changing to unlimited (0) is not recommended.\ntype:uint64");
toml.put ("database_rate_limit", database_rate_limit, "Rate limit on scanning accounts and pending entries from database.\nNote: changing to unlimited (0) is not recommended as this operation competes for resources on querying the database.\ntype:uint64");

View file

@ -22,6 +22,19 @@ public:
std::chrono::milliseconds cooldown{ 1000 * 3 };
};
// TODO: This should be moved next to `frontier_scan` class
class frontier_scan_config final
{
public:
// TODO: Serialize & deserialize
unsigned head_parallelistm{ 128 };
unsigned consideration_count{ 4 };
std::size_t candidates{ 1000 };
std::chrono::milliseconds cooldown{ 1000 * 5 };
std::size_t max_pending{ 16 };
};
// TODO: This should be moved next to `bootstrap_ascending` class
class bootstrap_ascending_config final
{
@ -31,12 +44,15 @@ public:
public:
bool enable{ true };
bool enable_database_scan{ true };
bool enable_scan{ true };
bool enable_database_scan{ false };
bool enable_dependency_walker{ true };
bool enable_frontier_scan{ true };
// Maximum number of un-responded requests per channel, should be lower or equal to bootstrap server max queue size
std::size_t channel_limit{ 16 };
std::size_t database_rate_limit{ 256 };
std::size_t frontier_rate_limit{ 8 };
std::size_t database_warmup_ratio{ 10 };
std::size_t max_pull_count{ nano::bootstrap_server::max_blocks };
std::chrono::milliseconds request_timeout{ 1000 * 5 };
@ -45,6 +61,7 @@ public:
std::size_t block_processor_threshold{ 1000 };
std::size_t max_requests{ 1024 };
nano::account_sets_config account_sets;
account_sets_config account_sets;
frontier_scan_config frontier_scan;
};
}

View file

@ -0,0 +1,187 @@
#pragma once
#include <nano/secure/account_info.hpp>
#include <nano/secure/pending_info.hpp>
#include <nano/store/account.hpp>
#include <nano/store/component.hpp>
#include <nano/store/pending.hpp>
#include <optional>
namespace nano::bootstrap_ascending
{
struct account_database_crawler
{
using value_type = std::pair<nano::account, nano::account_info>;
static constexpr size_t sequential_attempts = 10;
account_database_crawler (nano::store::component & store, nano::store::transaction const & transaction, nano::account const & start) :
store{ store },
transaction{ transaction },
it{ store.account.end (transaction) },
end{ store.account.end (transaction) }
{
seek (start);
}
void seek (nano::account const & account)
{
it = store.account.begin (transaction, account);
update_current ();
}
void advance ()
{
if (it == end)
{
debug_assert (!current);
return;
}
++it;
update_current ();
}
void advance_to (nano::account const & account)
{
if (it == end)
{
debug_assert (!current);
return;
}
// First try advancing sequentially
for (size_t count = 0; count < sequential_attempts && it != end; ++count, ++it)
{
// Break if we've reached or overshoot the target account
if (it->first.number () >= account.number ())
{
update_current ();
return;
}
}
// If that fails, perform a fresh lookup
seek (account);
}
std::optional<value_type> current{};
private:
void update_current ()
{
if (it != end)
{
current = *it;
}
else
{
current = std::nullopt;
}
}
nano::store::component & store;
nano::store::transaction const & transaction;
nano::store::account::iterator it;
nano::store::account::iterator const end;
};
struct pending_database_crawler
{
using value_type = std::pair<nano::pending_key, nano::pending_info>;
static constexpr size_t sequential_attempts = 10;
pending_database_crawler (nano::store::component & store, nano::store::transaction const & transaction, nano::account const & start) :
store{ store },
transaction{ transaction },
it{ store.pending.end (transaction) },
end{ store.pending.end (transaction) }
{
seek (start);
}
void seek (nano::account const & account)
{
it = store.pending.begin (transaction, { account, 0 });
update_current ();
}
// Advance to the next account
void advance ()
{
if (it == end)
{
debug_assert (!current);
return;
}
auto const starting_account = it->first.account;
// First try advancing sequentially
for (size_t count = 0; count < sequential_attempts && it != end; ++count, ++it)
{
// Break if we've reached the next account
if (it->first.account != starting_account)
{
update_current ();
return;
}
}
if (it != end)
{
// If that fails, perform a fresh lookup
seek (starting_account.number () + 1);
}
update_current ();
}
void advance_to (nano::account const & account)
{
if (it == end)
{
debug_assert (!current);
return;
}
// First try advancing sequentially
for (size_t count = 0; count < sequential_attempts && it != end; ++count, ++it)
{
// Break if we've reached or overshoot the target account
if (it->first.account.number () >= account.number ())
{
update_current ();
return;
}
}
// If that fails, perform a fresh lookup
seek (account);
}
std::optional<value_type> current{};
private:
void update_current ()
{
if (it != end)
{
current = *it;
}
else
{
current = std::nullopt;
}
}
nano::store::component & store;
nano::store::transaction const & transaction;
nano::store::pending::iterator it;
nano::store::pending::iterator const end;
};
}

View file

@ -1,4 +1,5 @@
#include <nano/lib/utility.hpp>
#include <nano/node/bootstrap_ascending/crawlers.hpp>
#include <nano/node/bootstrap_ascending/database_scan.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/ledger.hpp>
@ -13,8 +14,8 @@
nano::bootstrap_ascending::database_scan::database_scan (nano::ledger & ledger_a) :
ledger{ ledger_a },
accounts_iterator{ ledger },
pending_iterator{ ledger }
account_scanner{ ledger },
pending_scanner{ ledger }
{
}
@ -43,8 +44,8 @@ void nano::bootstrap_ascending::database_scan::fill ()
{
auto transaction = ledger.store.tx_begin_read ();
auto set1 = accounts_iterator.next_batch (transaction, batch_size);
auto set2 = pending_iterator.next_batch (transaction, batch_size);
auto set1 = account_scanner.next_batch (transaction, batch_size);
auto set2 = pending_scanner.next_batch (transaction, batch_size);
queue.insert (queue.end (), set1.begin (), set1.end ());
queue.insert (queue.end (), set2.begin (), set2.end ());
@ -52,41 +53,36 @@ void nano::bootstrap_ascending::database_scan::fill ()
bool nano::bootstrap_ascending::database_scan::warmed_up () const
{
return accounts_iterator.warmed_up () && pending_iterator.warmed_up ();
return account_scanner.completed > 0 && pending_scanner.completed > 0;
}
nano::container_info nano::bootstrap_ascending::database_scan::container_info () const
{
nano::container_info info;
info.put ("accounts_iterator", accounts_iterator.completed);
info.put ("pending_iterator", pending_iterator.completed);
info.put ("accounts_iterator", account_scanner.completed);
info.put ("pending_iterator", pending_scanner.completed);
return info;
}
/*
* account_database_iterator
* account_database_scanner
*/
nano::bootstrap_ascending::account_database_iterator::account_database_iterator (nano::ledger & ledger_a) :
ledger{ ledger_a }
{
}
std::deque<nano::account> nano::bootstrap_ascending::account_database_iterator::next_batch (nano::store::transaction & transaction, size_t batch_size)
std::deque<nano::account> nano::bootstrap_ascending::account_database_scanner::next_batch (nano::store::transaction & transaction, size_t batch_size)
{
std::deque<nano::account> result;
auto it = ledger.store.account.begin (transaction, next);
auto const end = ledger.store.account.end (transaction);
account_database_crawler crawler{ ledger.store, transaction, next };
for (size_t count = 0; it != end && count < batch_size; ++it, ++count)
for (size_t count = 0; crawler.current && count < batch_size; crawler.advance (), ++count)
{
auto const & account = it->first;
auto const & [account, info] = crawler.current.value ();
result.push_back (account);
next = account.number () + 1;
next = account.number () + 1; // TODO: Handle account number overflow
}
if (it == end)
// Empty current value indicates the end of the table
if (!crawler.current)
{
// Reset for the next ledger iteration
next = { 0 };
@ -96,72 +92,30 @@ std::deque<nano::account> nano::bootstrap_ascending::account_database_iterator::
return result;
}
bool nano::bootstrap_ascending::account_database_iterator::warmed_up () const
{
return completed > 0;
}
/*
* pending_database_iterator
* pending_database_scanner
*/
nano::bootstrap_ascending::pending_database_iterator::pending_database_iterator (nano::ledger & ledger_a) :
ledger{ ledger_a }
{
}
std::deque<nano::account> nano::bootstrap_ascending::pending_database_iterator::next_batch (nano::store::transaction & transaction, size_t batch_size)
std::deque<nano::account> nano::bootstrap_ascending::pending_database_scanner::next_batch (nano::store::transaction & transaction, size_t batch_size)
{
std::deque<nano::account> result;
auto it = ledger.store.pending.begin (transaction, next);
auto const end = ledger.store.pending.end (transaction);
pending_database_crawler crawler{ ledger.store, transaction, next };
// TODO: This pending iteration heuristic should be encapsulated in a pending_iterator class and reused across other components
// The heuristic is to advance the iterator sequentially until we reach a new account or perform a fresh lookup if the account has too many pending blocks
// This is to avoid the overhead of performing a fresh lookup for every pending account as majority of accounts have only a few pending blocks
auto advance_iterator = [&] () {
auto const starting_account = it->first.account;
// For RocksDB, sequential access is ~10x faster than performing a fresh lookup (tested on my machine)
const size_t sequential_attempts = 10;
// First try advancing sequentially
for (size_t count = 0; count < sequential_attempts && it != end; ++count, ++it)
{
if (it->first.account != starting_account)
{
break;
}
}
// If we didn't advance to the next account, perform a fresh lookup
if (it != end && it->first.account == starting_account)
{
it = ledger.store.pending.begin (transaction, { starting_account.number () + 1, 0 });
}
debug_assert (it == end || it->first.account != starting_account);
};
for (size_t count = 0; it != end && count < batch_size; advance_iterator (), ++count)
for (size_t count = 0; crawler.current && count < batch_size; crawler.advance (), ++count)
{
auto const & account = it->first.account;
result.push_back (account);
next = { account.number () + 1, 0 };
auto const & [key, info] = crawler.current.value ();
result.push_back (key.account);
next = key.account.number () + 1; // TODO: Handle account number overflow
}
if (it == end)
// Empty current value indicates the end of the table
if (!crawler.current)
{
// Reset for the next ledger iteration
next = { 0, 0 };
next = { 0 };
++completed;
}
return result;
}
bool nano::bootstrap_ascending::pending_database_iterator::warmed_up () const
{
return completed > 0;
}

View file

@ -9,27 +9,23 @@
namespace nano::bootstrap_ascending
{
struct account_database_iterator
struct account_database_scanner
{
explicit account_database_iterator (nano::ledger &);
nano::ledger & ledger;
std::deque<nano::account> next_batch (nano::store::transaction &, size_t batch_size);
bool warmed_up () const;
nano::ledger & ledger;
nano::account next{ 0 };
size_t completed{ 0 };
};
struct pending_database_iterator
struct pending_database_scanner
{
explicit pending_database_iterator (nano::ledger &);
nano::ledger & ledger;
std::deque<nano::account> next_batch (nano::store::transaction &, size_t batch_size);
bool warmed_up () const;
nano::ledger & ledger;
nano::pending_key next{ 0, 0 };
nano::account next{ 0 };
size_t completed{ 0 };
};
@ -52,8 +48,8 @@ private:
void fill ();
private:
account_database_iterator accounts_iterator;
pending_database_iterator pending_iterator;
account_database_scanner account_scanner;
pending_database_scanner pending_scanner;
std::deque<nano::account> queue;

View file

@ -0,0 +1,188 @@
#include <nano/node/bootstrap_ascending/frontier_scan.hpp>
#include <boost/multiprecision/cpp_dec_float.hpp>
#include <boost/multiprecision/cpp_int.hpp>
nano::bootstrap_ascending::frontier_scan::frontier_scan (frontier_scan_config const & config_a, nano::stats & stats_a) :
config{ config_a },
stats{ stats_a }
{
// Divide nano::account numeric range into consecutive and equal ranges
nano::uint256_t max_account = std::numeric_limits<nano::uint256_t>::max ();
nano::uint256_t range_size = max_account / config.head_parallelistm;
for (unsigned i = 0; i < config.head_parallelistm; ++i)
{
// Start at 1 to avoid the burn account
nano::uint256_t start = (i == 0) ? 1 : i * range_size;
nano::uint256_t end = (i == config.head_parallelistm - 1) ? max_account : start + range_size;
heads.emplace_back (frontier_head{ nano::account{ start }, nano::account{ end } });
}
release_assert (!heads.empty ());
}
nano::account nano::bootstrap_ascending::frontier_scan::next ()
{
auto const cutoff = std::chrono::steady_clock::now () - config.cooldown;
auto & heads_by_timestamp = heads.get<tag_timestamp> ();
for (auto it = heads_by_timestamp.begin (); it != heads_by_timestamp.end (); ++it)
{
auto const & head = *it;
if (head.requests < config.consideration_count || head.timestamp < cutoff)
{
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, (head.requests < config.consideration_count) ? nano::stat::detail::next_by_requests : nano::stat::detail::next_by_timestamp);
debug_assert (head.next.number () >= head.start.number ());
debug_assert (head.next.number () < head.end.number ());
auto result = head.next;
heads_by_timestamp.modify (it, [this] (auto & entry) {
entry.requests += 1;
entry.timestamp = std::chrono::steady_clock::now ();
});
return result;
}
}
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::next_none);
return { 0 };
}
bool nano::bootstrap_ascending::frontier_scan::process (nano::account start, std::deque<std::pair<nano::account, nano::block_hash>> const & response)
{
debug_assert (std::all_of (response.begin (), response.end (), [&] (auto const & pair) { return pair.first.number () >= start.number (); }));
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::process);
// Find the first head with head.start <= start
auto & heads_by_start = heads.get<tag_start> ();
auto it = heads_by_start.upper_bound (start);
release_assert (it != heads_by_start.begin ());
it = std::prev (it);
bool done = false;
heads_by_start.modify (it, [this, &response, &done] (frontier_head & entry) {
entry.completed += 1;
for (auto const & [account, _] : response)
{
// Only consider candidates that actually advance the current frontier
if (account.number () > entry.next.number ())
{
entry.candidates.insert (account);
}
}
// Trim the candidates
while (entry.candidates.size () > config.candidates)
{
release_assert (!entry.candidates.empty ());
entry.candidates.erase (std::prev (entry.candidates.end ()));
}
// Special case for the last frontier head that won't receive larger than max frontier
if (entry.completed >= config.consideration_count * 2 && entry.candidates.empty ())
{
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::done_empty);
entry.candidates.insert (entry.end);
}
// Check if done
if (entry.completed >= config.consideration_count && !entry.candidates.empty ())
{
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::done);
// Take the last candidate as the next frontier
release_assert (!entry.candidates.empty ());
auto it = std::prev (entry.candidates.end ());
debug_assert (entry.next.number () < it->number ());
entry.next = *it;
entry.processed += entry.candidates.size ();
entry.candidates.clear ();
entry.requests = 0;
entry.completed = 0;
entry.timestamp = {};
// Bound the search range
if (entry.next.number () >= entry.end.number ())
{
stats.inc (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::done_range);
entry.next = entry.start;
}
done = true;
}
});
return done;
}
nano::container_info nano::bootstrap_ascending::frontier_scan::container_info () const
{
auto collect_progress = [&] () {
nano::container_info info;
for (int n = 0; n < heads.size (); ++n)
{
auto const & head = heads[n];
boost::multiprecision::cpp_dec_float_50 start{ head.start.number ().str () };
boost::multiprecision::cpp_dec_float_50 next{ head.next.number ().str () };
boost::multiprecision::cpp_dec_float_50 end{ head.end.number ().str () };
// Progress in the range [0, 1000000] since we can only represent `size_t` integers in the container_info data
boost::multiprecision::cpp_dec_float_50 progress = (next - start) * boost::multiprecision::cpp_dec_float_50 (1000000) / (end - start);
info.put (std::to_string (n), progress.convert_to<std::uint64_t> ());
}
return info;
};
auto collect_candidates = [&] () {
nano::container_info info;
for (int n = 0; n < heads.size (); ++n)
{
auto const & head = heads[n];
info.put (std::to_string (n), head.candidates.size ());
}
return info;
};
auto collect_responses = [&] () {
nano::container_info info;
for (int n = 0; n < heads.size (); ++n)
{
auto const & head = heads[n];
info.put (std::to_string (n), head.completed);
}
return info;
};
auto collect_processed = [&] () {
nano::container_info info;
for (int n = 0; n < heads.size (); ++n)
{
auto const & head = heads[n];
info.put (std::to_string (n), head.processed);
}
return info;
};
auto total_processed = std::accumulate (heads.begin (), heads.end (), std::size_t{ 0 }, [] (auto total, auto const & head) {
return total + head.processed;
});
nano::container_info info;
info.put ("total_processed", total_processed);
info.add ("progress", collect_progress ());
info.add ("candidates", collect_candidates ());
info.add ("responses", collect_responses ());
info.add ("processed", collect_processed ());
return info;
}

View file

@ -0,0 +1,87 @@
#pragma once
#include <nano/node/bootstrap/bootstrap_config.hpp>
#include <nano/node/bootstrap_ascending/common.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
#include <chrono>
#include <map>
#include <set>
namespace mi = boost::multi_index;
namespace nano::bootstrap_ascending
{
/*
* Frontier scan divides the account space into ranges and scans each range for outdated frontiers in parallel.
* This class is used to track the progress of each range.
*/
class frontier_scan
{
public:
frontier_scan (frontier_scan_config const &, nano::stats &);
nano::account next ();
bool process (nano::account start, std::deque<std::pair<nano::account, nano::block_hash>> const & response);
nano::container_info container_info () const;
private: // Dependencies
frontier_scan_config const & config;
nano::stats & stats;
private:
// Represents a range of accounts to scan, once the full range is scanned (goes past `end`) the head wraps around (to the `start`)
struct frontier_head
{
frontier_head (nano::account start_a, nano::account end_a) :
start{ start_a },
end{ end_a },
next{ start_a }
{
}
// The range of accounts to scan is [start, end)
nano::account const start;
nano::account const end;
// We scan the range by querying frontiers starting at 'next' and gathering candidates
nano::account next;
std::set<nano::account> candidates;
unsigned requests{ 0 };
unsigned completed{ 0 };
std::chrono::steady_clock::time_point timestamp{};
size_t processed{ 0 }; // Total number of accounts processed
nano::account index () const
{
return start;
}
};
// clang-format off
class tag_sequenced {};
class tag_start {};
class tag_timestamp {};
using ordered_heads = boost::multi_index_container<frontier_head,
mi::indexed_by<
mi::random_access<mi::tag<tag_sequenced>>,
mi::ordered_unique<mi::tag<tag_start>,
mi::const_mem_fun<frontier_head, nano::account, &frontier_head::index>>,
mi::ordered_non_unique<mi::tag<tag_timestamp>,
mi::member<frontier_head, std::chrono::steady_clock::time_point, &frontier_head::timestamp>>
>>;
// clang-format on
ordered_heads heads;
};
}

View file

@ -4,6 +4,7 @@
#include <nano/lib/stats_enums.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap_ascending/crawlers.hpp>
#include <nano/node/bootstrap_ascending/service.hpp>
#include <nano/node/network.hpp>
#include <nano/node/nodeconfig.hpp>
@ -30,9 +31,12 @@ nano::bootstrap_ascending::service::service (nano::node_config const & node_conf
logger{ logger_a },
accounts{ config.account_sets, stats },
database_scan{ ledger },
frontiers{ config.frontier_scan, stats },
throttle{ compute_throttle_size () },
scoring{ config, node_config_a.network_params.network },
database_limiter{ config.database_rate_limit, 1.0 }
database_limiter{ config.database_rate_limit },
frontiers_limiter{ config.frontier_rate_limit },
workers{ 1, nano::thread_role::name::ascending_bootstrap_worker }
{
block_processor.batch_processed.add ([this] (auto const & batch) {
{
@ -57,7 +61,9 @@ nano::bootstrap_ascending::service::~service ()
debug_assert (!priorities_thread.joinable ());
debug_assert (!database_thread.joinable ());
debug_assert (!dependencies_thread.joinable ());
debug_assert (!frontiers_thread.joinable ());
debug_assert (!timeout_thread.joinable ());
debug_assert (!workers.alive ());
}
void nano::bootstrap_ascending::service::start ()
@ -65,6 +71,7 @@ void nano::bootstrap_ascending::service::start ()
debug_assert (!priorities_thread.joinable ());
debug_assert (!database_thread.joinable ());
debug_assert (!dependencies_thread.joinable ());
debug_assert (!frontiers_thread.joinable ());
debug_assert (!timeout_thread.joinable ());
if (!config.enable)
@ -73,10 +80,15 @@ void nano::bootstrap_ascending::service::start ()
return;
}
priorities_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run_priorities ();
});
workers.start ();
if (config.enable_scan)
{
priorities_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run_priorities ();
});
}
if (config.enable_database_scan)
{
@ -94,6 +106,14 @@ void nano::bootstrap_ascending::service::start ()
});
}
if (config.enable_frontier_scan)
{
frontiers_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run_frontiers ();
});
}
timeout_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::ascending_bootstrap);
run_timeouts ();
@ -111,10 +131,13 @@ void nano::bootstrap_ascending::service::stop ()
nano::join_or_pass (priorities_thread);
nano::join_or_pass (database_thread);
nano::join_or_pass (dependencies_thread);
nano::join_or_pass (frontiers_thread);
nano::join_or_pass (timeout_thread);
workers.stop ();
}
void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::channel> const & channel, async_tag tag)
bool nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::channel> const & channel, async_tag tag)
{
debug_assert (tag.type != query_type::invalid);
debug_assert (tag.source != query_source::invalid);
@ -125,6 +148,8 @@ void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::
tags.get<tag_id> ().insert (tag);
}
on_request.notify (tag, channel);
nano::asc_pull_req request{ network_constants };
request.id = tag.id;
@ -152,6 +177,16 @@ void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::
request.payload = pld;
}
break;
case query_type::frontiers:
{
request.type = nano::asc_pull_type::frontiers;
nano::asc_pull_req::frontiers_payload pld;
pld.start = tag.start.as_account ();
pld.count = nano::asc_pull_ack::frontiers_payload::max_frontiers;
request.payload = pld;
}
break;
default:
debug_assert (false);
}
@ -165,6 +200,8 @@ void nano::bootstrap_ascending::service::send (std::shared_ptr<nano::transport::
channel->send (
request, nullptr,
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
return true; // TODO: Return channel send result
}
std::size_t nano::bootstrap_ascending::service::priority_size () const
@ -185,6 +222,18 @@ std::size_t nano::bootstrap_ascending::service::score_size () const
return scoring.size ();
}
bool nano::bootstrap_ascending::service::prioritized (nano::account const & account) const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return accounts.prioritized (account);
}
bool nano::bootstrap_ascending::service::blocked (nano::account const & account) const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return accounts.blocked (account);
}
/** Inspects a block that has been processed by the block processor
- Marks an account as blocked if the result code is gap source as there is no reason request additional blocks for this account until the dependency is resolved
- Marks an account as forwarded if it has been recently referenced by a block that has been inserted.
@ -412,6 +461,24 @@ nano::block_hash nano::bootstrap_ascending::service::wait_blocking ()
return result;
}
nano::account nano::bootstrap_ascending::service::wait_frontier ()
{
nano::account result{ 0 };
wait ([this, &result] () {
debug_assert (!mutex.try_lock ());
result = frontiers.next ();
if (!result.is_zero ())
{
stats.inc (nano::stat::type::bootstrap_ascending_next, nano::stat::detail::next_frontier);
return true;
}
return false;
});
return result;
}
bool nano::bootstrap_ascending::service::request (nano::account account, size_t count, std::shared_ptr<nano::transport::channel> const & channel, query_source source)
{
debug_assert (count > 0);
@ -439,11 +506,7 @@ bool nano::bootstrap_ascending::service::request (nano::account account, size_t
tag.start = account;
}
on_request.notify (tag, channel);
send (channel, tag);
return true; // Request sent
return send (channel, tag);
}
bool nano::bootstrap_ascending::service::request_info (nano::block_hash hash, std::shared_ptr<nano::transport::channel> const & channel, query_source source)
@ -454,11 +517,17 @@ bool nano::bootstrap_ascending::service::request_info (nano::block_hash hash, st
tag.start = hash;
tag.hash = hash;
on_request.notify (tag, channel);
return send (channel, tag);
}
send (channel, tag);
bool nano::bootstrap_ascending::service::request_frontiers (nano::account start, std::shared_ptr<nano::transport::channel> const & channel, query_source source)
{
async_tag tag{};
tag.type = query_type::frontiers;
tag.source = source;
tag.start = start;
return true; // Request sent
return send (channel, tag);
}
void nano::bootstrap_ascending::service::run_one_priority ()
@ -552,6 +621,43 @@ void nano::bootstrap_ascending::service::run_dependencies ()
}
}
void nano::bootstrap_ascending::service::run_one_frontier ()
{
wait ([this] () {
return !accounts.priority_half_full ();
});
wait ([this] () {
return frontiers_limiter.should_pass (1);
});
wait ([this] () {
return workers.queued_tasks () < config.frontier_scan.max_pending;
});
wait_tags ();
auto channel = wait_channel ();
if (!channel)
{
return;
}
auto frontier = wait_frontier ();
if (frontier.is_zero ())
{
return;
}
request_frontiers (frontier, channel, query_source::frontiers);
}
void nano::bootstrap_ascending::service::run_frontiers ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
lock.unlock ();
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::loop_frontiers);
run_one_frontier ();
lock.lock ();
}
}
void nano::bootstrap_ascending::service::cleanup_and_sync ()
{
debug_assert (!mutex.try_lock ());
@ -625,7 +731,7 @@ void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & mes
}
bool operator() (const nano::asc_pull_ack::frontiers_payload & response) const
{
return false; // TODO: Handle frontiers
return type == query_type::frontiers;
}
bool operator() (const nano::empty_payload & response) const
{
@ -667,7 +773,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc
{
case verify_result::ok:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::ok);
stats.inc (nano::stat::type::bootstrap_ascending_verify_blocks, nano::stat::detail::ok);
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::blocks, nano::stat::dir::in, response.blocks.size ());
auto blocks = response.blocks;
@ -708,7 +814,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc
break;
case verify_result::nothing_new:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::nothing_new);
stats.inc (nano::stat::type::bootstrap_ascending_verify_blocks, nano::stat::detail::nothing_new);
nano::lock_guard<nano::mutex> lock{ mutex };
accounts.priority_down (tag.account);
@ -720,7 +826,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc
break;
case verify_result::invalid:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify, nano::stat::detail::invalid);
stats.inc (nano::stat::type::bootstrap_ascending_verify_blocks, nano::stat::detail::invalid);
}
break;
}
@ -734,24 +840,69 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::acco
if (response.account.is_zero ())
{
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info_empty);
return;
}
else
{
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info);
// Prioritize account containing the dependency
{
nano::lock_guard<nano::mutex> lock{ mutex };
accounts.dependency_update (tag.hash, response.account);
accounts.priority_set (response.account);
}
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::account_info);
// Prioritize account containing the dependency
{
nano::lock_guard<nano::mutex> lock{ mutex };
accounts.dependency_update (tag.hash, response.account);
accounts.priority_set (response.account);
}
}
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::frontiers_payload & response, const async_tag & tag)
{
// TODO: Make use of frontiers info
debug_assert (tag.type == query_type::frontiers);
debug_assert (!tag.start.is_zero ());
if (response.frontiers.empty ())
{
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::frontiers_empty);
return;
}
stats.inc (nano::stat::type::bootstrap_ascending_process, nano::stat::detail::frontiers);
auto result = verify (response, tag);
switch (result)
{
case verify_result::ok:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify_frontiers, nano::stat::detail::ok);
stats.add (nano::stat::type::bootstrap_ascending, nano::stat::detail::frontiers, nano::stat::dir::in, response.frontiers.size ());
{
nano::lock_guard<nano::mutex> lock{ mutex };
frontiers.process (tag.start.as_account (), response.frontiers);
}
// Allow some overfill to avoid unnecessarily dropping responses
if (workers.queued_tasks () < config.frontier_scan.max_pending * 4)
{
workers.post ([this, frontiers = response.frontiers] {
process_frontiers (frontiers);
});
}
else
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::dropped_frontiers);
}
}
break;
case verify_result::nothing_new:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify_frontiers, nano::stat::detail::nothing_new);
}
break;
case verify_result::invalid:
{
stats.inc (nano::stat::type::bootstrap_ascending_verify_frontiers, nano::stat::detail::invalid);
}
break;
}
}
void nano::bootstrap_ascending::service::process (const nano::empty_payload & response, const async_tag & tag)
@ -760,6 +911,86 @@ void nano::bootstrap_ascending::service::process (const nano::empty_payload & re
debug_assert (false, "empty payload"); // Should not happen
}
void nano::bootstrap_ascending::service::process_frontiers (std::deque<std::pair<nano::account, nano::block_hash>> const & frontiers)
{
release_assert (!frontiers.empty ());
// Accounts must be passed in ascending order
debug_assert (std::adjacent_find (frontiers.begin (), frontiers.end (), [] (auto const & lhs, auto const & rhs) {
return lhs.first.number () >= rhs.first.number ();
})
== frontiers.end ());
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::process_frontiers);
size_t outdated = 0;
size_t pending = 0;
// Accounts with outdated frontiers to sync
std::deque<nano::account> result;
{
auto transaction = ledger.tx_begin_read ();
auto const start = frontiers.front ().first;
account_database_crawler account_crawler{ ledger.store, transaction, start };
pending_database_crawler pending_crawler{ ledger.store, transaction, start };
auto block_exists = [&] (nano::block_hash const & hash) {
return ledger.any.block_exists_or_pruned (transaction, hash);
};
auto should_prioritize = [&] (nano::account const & account, nano::block_hash const & frontier) {
account_crawler.advance_to (account);
pending_crawler.advance_to (account);
// Check if account exists in our ledger
if (account_crawler.current && account_crawler.current->first == account)
{
// Check for frontier mismatch
if (account_crawler.current->second.head != frontier)
{
// Check if frontier block exists in our ledger
if (!block_exists (frontier))
{
outdated++;
return true; // Frontier is outdated
}
}
return false; // Account exists and frontier is up-to-date
}
// Check if account has pending blocks in our ledger
if (pending_crawler.current && pending_crawler.current->first.account == account)
{
pending++;
return true; // Account doesn't exist but has pending blocks in the ledger
}
return false; // Account doesn't exist in the ledger and has no pending blocks, can't be prioritized right now
};
for (auto const & [account, frontier] : frontiers)
{
if (should_prioritize (account, frontier))
{
result.push_back (account);
}
}
}
stats.add (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::processed, frontiers.size ());
stats.add (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::prioritized, result.size ());
stats.add (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::outdated, outdated);
stats.add (nano::stat::type::bootstrap_ascending_frontiers, nano::stat::detail::pending, pending);
nano::lock_guard<nano::mutex> guard{ mutex };
for (auto const & account : result)
{
accounts.priority_set (account);
}
}
nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::service::verify (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag) const
{
auto const & blocks = response.blocks;
@ -819,6 +1050,35 @@ nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::ser
return verify_result::ok;
}
nano::bootstrap_ascending::service::verify_result nano::bootstrap_ascending::service::verify (nano::asc_pull_ack::frontiers_payload const & response, async_tag const & tag) const
{
auto const & frontiers = response.frontiers;
if (frontiers.empty ())
{
return verify_result::nothing_new;
}
// Ensure frontiers accounts are in ascending order
nano::account previous{ 0 };
for (auto const & [account, _] : frontiers)
{
if (account.number () <= previous.number ())
{
return verify_result::invalid;
}
previous = account;
}
// Ensure the frontiers are larger or equal to the requested frontier
if (frontiers.front ().first.number () < tag.start.as_account ().number ())
{
return verify_result::invalid;
}
return verify_result::ok;
}
auto nano::bootstrap_ascending::service::info () const -> nano::bootstrap_ascending::account_sets::info_t
{
nano::lock_guard<nano::mutex> lock{ mutex };
@ -843,6 +1103,8 @@ nano::container_info nano::bootstrap_ascending::service::container_info () const
info.put ("throttle_successes", throttle.successes ());
info.add ("accounts", accounts.container_info ());
info.add ("database_scan", database_scan.container_info ());
info.add ("frontiers", frontiers.container_info ());
info.add ("workers", workers.container_info ());
return info;
}

View file

@ -5,13 +5,15 @@
#include <nano/lib/numbers.hpp>
#include <nano/lib/observer_set.hpp>
#include <nano/lib/rate_limiting.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/bootstrap/bootstrap_config.hpp>
#include <nano/node/bootstrap_ascending/account_sets.hpp>
#include <nano/node/bootstrap_ascending/common.hpp>
#include <nano/node/bootstrap_ascending/database_scan.hpp>
#include <nano/node/bootstrap_ascending/frontier_scan.hpp>
#include <nano/node/bootstrap_ascending/peer_scoring.hpp>
#include <nano/node/bootstrap_ascending/throttle.hpp>
#include <nano/node/fwd.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
@ -43,6 +45,8 @@ namespace bootstrap_ascending
std::size_t blocked_size () const;
std::size_t priority_size () const;
std::size_t score_size () const;
bool prioritized (nano::account const &) const;
bool blocked (nano::account const &) const;
nano::container_info container_info () const;
@ -64,6 +68,7 @@ namespace bootstrap_ascending
blocks_by_hash,
blocks_by_account,
account_info_by_hash,
frontiers,
};
enum class query_source
@ -72,6 +77,7 @@ namespace bootstrap_ascending
priority,
database,
blocking,
frontiers,
};
struct async_tag
@ -102,6 +108,8 @@ namespace bootstrap_ascending
void run_one_database (bool should_throttle);
void run_dependencies ();
void run_one_blocking ();
void run_one_frontier ();
void run_frontiers ();
void run_timeouts ();
void cleanup_and_sync ();
@ -123,16 +131,21 @@ namespace bootstrap_ascending
/* Waits for next available blocking block */
nano::block_hash next_blocking ();
nano::block_hash wait_blocking ();
/* Waits for next available frontier scan range */
nano::account wait_frontier ();
bool request (nano::account, size_t count, std::shared_ptr<nano::transport::channel> const &, query_source);
bool request_info (nano::block_hash, std::shared_ptr<nano::transport::channel> const &, query_source);
void send (std::shared_ptr<nano::transport::channel> const &, async_tag tag);
bool request_frontiers (nano::account, std::shared_ptr<nano::transport::channel> const &, query_source);
bool send (std::shared_ptr<nano::transport::channel> const &, async_tag tag);
void process (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag);
void process (nano::asc_pull_ack::account_info_payload const & response, async_tag const & tag);
void process (nano::asc_pull_ack::frontiers_payload const & response, async_tag const & tag);
void process (nano::empty_payload const & response, async_tag const & tag);
void process_frontiers (std::deque<std::pair<nano::account, nano::block_hash>> const & frontiers);
enum class verify_result
{
ok,
@ -147,6 +160,7 @@ namespace bootstrap_ascending
* - ok: otherwise, if all checks pass
*/
verify_result verify (nano::asc_pull_ack::blocks_payload const & response, async_tag const & tag) const;
verify_result verify (nano::asc_pull_ack::frontiers_payload const & response, async_tag const & tag) const;
size_t count_tags (nano::account const & account, query_source source) const;
size_t count_tags (nano::block_hash const & hash, query_source source) const;
@ -159,6 +173,7 @@ namespace bootstrap_ascending
nano::bootstrap_ascending::database_scan database_scan;
nano::bootstrap_ascending::throttle throttle;
nano::bootstrap_ascending::peer_scoring scoring;
nano::bootstrap_ascending::frontier_scan frontiers;
// clang-format off
class tag_sequenced {};
@ -182,6 +197,7 @@ namespace bootstrap_ascending
// Requests for accounts from database have much lower hitrate and could introduce strain on the network
// A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue
nano::rate_limiter database_limiter;
nano::rate_limiter frontiers_limiter;
nano::interval sync_dependencies_interval;
@ -191,7 +207,10 @@ namespace bootstrap_ascending
std::thread priorities_thread;
std::thread database_thread;
std::thread dependencies_thread;
std::thread frontiers_thread;
std::thread timeout_thread;
nano::thread_pool workers;
};
nano::stat::detail to_stat_detail (service::query_type);

View file

@ -730,7 +730,7 @@ public: // Payload definitions
static frontier deserialize_frontier (nano::stream &);
public: // Payload
std::vector<frontier> frontiers;
std::deque<frontier> frontiers;
public: // Logging
void operator() (nano::object_stream &) const;

View file

@ -19,7 +19,7 @@ nano::monitor::~monitor ()
void nano::monitor::start ()
{
if (!config.enabled)
if (!config.enable)
{
return;
}
@ -115,7 +115,7 @@ void nano::monitor::run_one ()
nano::error nano::monitor_config::serialize (nano::tomlconfig & toml) const
{
toml.put ("enable", enabled, "Enable or disable periodic node status logging\ntype:bool");
toml.put ("enable", enable, "Enable or disable periodic node status logging\ntype:bool");
toml.put ("interval", interval.count (), "Interval between status logs\ntype:seconds");
return toml.get_error ();
@ -123,7 +123,7 @@ nano::error nano::monitor_config::serialize (nano::tomlconfig & toml) const
nano::error nano::monitor_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("enable", enabled);
toml.get ("enable", enable);
auto interval_l = interval.count ();
toml.get ("interval", interval_l);
interval = std::chrono::seconds{ interval_l };

View file

@ -17,7 +17,7 @@ public:
nano::error serialize (nano::tomlconfig &) const;
public:
bool enabled{ true };
bool enable{ true };
std::chrono::seconds interval{ 60s };
};

View file

@ -31,7 +31,7 @@ void nano::scheduler::hinted::start ()
{
debug_assert (!thread.joinable ());
if (!config.enabled)
if (!config.enable)
{
return;
}
@ -260,7 +260,7 @@ nano::scheduler::hinted_config::hinted_config (nano::network_constants const & n
nano::error nano::scheduler::hinted_config::serialize (nano::tomlconfig & toml) const
{
toml.put ("enable", enabled, "Enable or disable hinted elections\ntype:bool");
toml.put ("enable", enable, "Enable or disable hinted elections\ntype:bool");
toml.put ("hinting_threshold", hinting_threshold_percent, "Percentage of online weight needed to start a hinted election. \ntype:uint32,[0,100]");
toml.put ("check_interval", check_interval.count (), "Interval between scans of the vote cache for possible hinted elections. \ntype:milliseconds");
toml.put ("block_cooldown", block_cooldown.count (), "Cooldown period for blocks that failed to start an election. \ntype:milliseconds");
@ -271,7 +271,7 @@ nano::error nano::scheduler::hinted_config::serialize (nano::tomlconfig & toml)
nano::error nano::scheduler::hinted_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("enable", enabled);
toml.get ("enable", enable);
toml.get ("hinting_threshold", hinting_threshold_percent);
auto check_interval_l = check_interval.count ();

View file

@ -28,7 +28,7 @@ public:
nano::error serialize (nano::tomlconfig & toml) const;
public:
bool enabled{ true };
bool enable{ true };
std::chrono::milliseconds check_interval{ 1000 };
std::chrono::milliseconds block_cooldown{ 10000 };
unsigned hinting_threshold_percent{ 10 };

View file

@ -29,7 +29,7 @@ void nano::scheduler::optimistic::start ()
{
debug_assert (!thread.joinable ());
if (!config.enabled)
if (!config.enable)
{
return;
}
@ -72,7 +72,7 @@ bool nano::scheduler::optimistic::activate_predicate (const nano::account_info &
bool nano::scheduler::optimistic::activate (const nano::account & account, const nano::account_info & account_info, const nano::confirmation_height_info & conf_info)
{
if (!config.enabled)
if (!config.enable)
{
return false;
}
@ -183,7 +183,7 @@ nano::container_info nano::scheduler::optimistic::container_info () const
nano::error nano::scheduler::optimistic_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("enable", enabled);
toml.get ("enable", enable);
toml.get ("gap_threshold", gap_threshold);
toml.get ("max_size", max_size);
@ -192,7 +192,7 @@ nano::error nano::scheduler::optimistic_config::deserialize (nano::tomlconfig &
nano::error nano::scheduler::optimistic_config::serialize (nano::tomlconfig & toml) const
{
toml.put ("enable", enabled, "Enable or disable optimistic elections\ntype:bool");
toml.put ("enable", enable, "Enable or disable optimistic elections\ntype:bool");
toml.put ("gap_threshold", gap_threshold, "Minimum difference between confirmation frontier and account frontier to become a candidate for optimistic confirmation\ntype:uint64");
toml.put ("max_size", max_size, "Maximum number of candidates stored in memory\ntype:uint64");

View file

@ -30,7 +30,7 @@ public:
nano::error serialize (nano::tomlconfig & toml) const;
public:
bool enabled{ true };
bool enable{ true };
/** Minimum difference between confirmation frontier and account frontier to become a candidate for optimistic confirmation */
std::size_t gap_threshold{ 32 };

View file

@ -88,7 +88,7 @@ void nano::scheduler::priority::start ()
debug_assert (!thread.joinable ());
debug_assert (!cleanup_thread.joinable ());
if (!config.enabled)
if (!config.enable)
{
return;
}

View file

@ -20,7 +20,7 @@ public:
// TODO: Serialization & deserialization
public:
bool enabled{ true };
bool enable{ true };
};
class priority final