diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 7318fd415..ccdd791cc 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -4,6 +4,7 @@ add_executable( fakes/websocket_client.hpp fakes/work_peer.hpp active_transactions.cpp + backlog.cpp block.cpp block_store.cpp blockprocessor.cpp diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index ee106258a..0a2ca2c17 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -46,6 +46,10 @@ TEST (active_transactions, confirm_election_by_request) // Ensure election on node1 is already confirmed before connecting with node2 ASSERT_TIMELY (5s, nano::test::confirmed (node1, { send1 })); + // Wait for the election to be removed and give time for any in-flight vote broadcasts to settle + ASSERT_TIMELY (5s, node1.active.empty ()); + WAIT (1s); + // At this point node1 should not generate votes for send1 block unless it receives a request // Create a second node diff --git a/nano/core_test/backlog.cpp b/nano/core_test/backlog.cpp new file mode 100644 index 000000000..a4384e156 --- /dev/null +++ b/nano/core_test/backlog.cpp @@ -0,0 +1,107 @@ +#include +#include +#include + +#include + +#include + +using namespace std::chrono_literals; + +namespace +{ +using block_list_t = std::vector>; + +/* + * Creates `count` 1 raw sends from genesis to unique accounts and corresponding open blocks. + * The genesis chain is then confirmed, but leaves open blocks unconfirmed + * The list of unconfirmed open blocks is returned. + */ +block_list_t setup_independent_blocks (nano::test::system & system, nano::node & node, int count) +{ + std::vector> blocks; + + auto latest = node.latest (nano::dev::genesis_key.pub); + auto balance = node.balance (nano::dev::genesis_key.pub); + + for (int n = 0; n < count; ++n) + { + nano::keypair key; + nano::block_builder builder; + + balance -= 1; + auto send = builder + .state () + .account (nano::dev::genesis_key.pub) + .previous (latest) + .representative (nano::dev::genesis_key.pub) + .balance (balance) + .link (key.pub) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (*system.work.generate (latest)) + .build_shared (); + latest = send->hash (); + + auto open = builder + .state () + .account (key.pub) + .previous (0) + .representative (key.pub) + .balance (1) + .link (send->hash ()) + .sign (key.prv, key.pub) + .work (*system.work.generate (key.pub)) + .build_shared (); + + EXPECT_TRUE (nano::test::process (node, { send, open })); + EXPECT_TIMELY (5s, nano::test::exists (node, { send, open })); // Ensure blocks are in the ledger + + blocks.push_back (open); + } + + // Confirm whole genesis chain at once + EXPECT_TIMELY (5s, nano::test::confirm (node, { latest })); + EXPECT_TIMELY (5s, nano::test::confirmed (node, { latest })); + + return blocks; +} +} + +/* + * Ensures all not confirmed accounts get activated by backlog scan periodically + */ +TEST (backlog, population) +{ + nano::test::system system{}; + auto & node = *system.add_node (); + + nano::mutex mutex; + std::unordered_set activated; + + node.backlog.activate_callback.add ([&] (nano::transaction const & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info) { + nano::lock_guard lock{ mutex }; + + activated.insert (account); + }); + + auto blocks = setup_independent_blocks (system, node, 256); + + // Checks if `activated` set contains all accounts we previously set up + auto all_activated = [&] () { + nano::lock_guard lock{ mutex }; + return std::all_of (blocks.begin (), blocks.end (), [&] (auto const & item) { + auto account = item->account (); + debug_assert (!account.is_zero ()); + return activated.count (account) != 0; + }); + }; + ASSERT_TIMELY (5s, all_activated ()); + + // Clear activated set to ensure we activate those accounts more than once + { + nano::lock_guard lock{ mutex }; + activated.clear (); + } + + ASSERT_TIMELY (5s, all_activated ()); +} diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 54de35077..40570b152 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -190,6 +190,8 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.work_peers, defaults.node.work_peers); ASSERT_EQ (conf.node.work_threads, defaults.node.work_threads); ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests); + ASSERT_EQ (conf.node.backlog_scan_batch_size, defaults.node.backlog_scan_batch_size); + ASSERT_EQ (conf.node.backlog_scan_frequency, defaults.node.backlog_scan_frequency); ASSERT_EQ (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value); ASSERT_EQ (conf.node.logging.flush, defaults.node.logging.flush); @@ -433,6 +435,9 @@ TEST (toml, daemon_config_deserialize_no_defaults) max_work_generate_multiplier = 1.0 max_queued_requests = 999 frontiers_confirmation = "always" + backlog_scan_batch_size = 999 + backlog_scan_frequency = 999 + [node.diagnostics.txn_tracking] enable = true ignore_writes_below_block_processor_max_time = false @@ -602,6 +607,8 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.work_peers, defaults.node.work_peers); ASSERT_NE (conf.node.work_threads, defaults.node.work_threads); ASSERT_NE (conf.node.max_queued_requests, defaults.node.max_queued_requests); + ASSERT_NE (conf.node.backlog_scan_batch_size, defaults.node.backlog_scan_batch_size); + ASSERT_NE (conf.node.backlog_scan_frequency, defaults.node.backlog_scan_frequency); ASSERT_NE (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value); ASSERT_NE (conf.node.logging.flush, defaults.node.logging.flush); diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index 0e370cc19..5f544dbe5 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -557,6 +557,9 @@ std::string nano::stat::type_to_string (stat::type type) case nano::stat::type::active: res = "active"; break; + case nano::stat::type::backlog: + res = "backlog"; + break; } return res; } @@ -572,6 +575,9 @@ std::string nano::stat::detail_to_string (stat::detail detail) case nano::stat::detail::loop: res = "loop"; break; + case nano::stat::detail::total: + res = "total"; + break; case nano::stat::detail::queue: res = "queue"; break; @@ -811,6 +817,7 @@ std::string nano::stat::detail_to_string (stat::detail detail) break; case nano::stat::detail::election_not_confirmed: res = "election_not_confirmed"; + break; case nano::stat::detail::election_hinted_overflow: res = "election_hinted_overflow"; break; @@ -1045,6 +1052,9 @@ std::string nano::stat::detail_to_string (stat::detail detail) case nano::stat::detail::channel_full: res = "channel_full"; break; + case nano::stat::detail::activated: + res = "activated"; + break; } return res; } diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index 251456ab6..bc9177e59 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -249,6 +249,7 @@ public: blockprocessor, bootstrap_server, active, + backlog, }; /** Optional detail type */ @@ -258,6 +259,7 @@ public: // common loop, + total, // processing queue queue, @@ -458,6 +460,9 @@ public: response_blocks, response_account_info, channel_full, + + // backlog + activated, }; /** Direction of the stat. If the direction is irrelevant, use in */ diff --git a/nano/node/backlog_population.cpp b/nano/node/backlog_population.cpp index b38d90af5..0ee6a2a84 100644 --- a/nano/node/backlog_population.cpp +++ b/nano/node/backlog_population.cpp @@ -4,41 +4,42 @@ #include #include -nano::backlog_population::backlog_population (const config & config_a, nano::store & store_a, nano::election_scheduler & scheduler_a) : +nano::backlog_population::backlog_population (const config & config_a, nano::store & store_a, nano::stat & stats_a) : config_m{ config_a }, - store_m{ store_a }, - scheduler{ scheduler_a } + store{ store_a }, + stats{ stats_a } { } nano::backlog_population::~backlog_population () { - stop (); - if (thread.joinable ()) - { - thread.join (); - } + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); } void nano::backlog_population::start () { - if (!thread.joinable ()) - { - thread = std::thread{ [this] () { run (); } }; - } + debug_assert (!thread.joinable ()); + + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::backlog_population); + run (); + } }; } void nano::backlog_population::stop () { - nano::unique_lock lock{ mutex }; stopped = true; notify (); + nano::join_or_pass (thread); } void nano::backlog_population::trigger () { - nano::unique_lock lock{ mutex }; - triggered = true; + { + nano::unique_lock lock{ mutex }; + triggered = true; + } notify (); } @@ -49,25 +50,25 @@ void nano::backlog_population::notify () bool nano::backlog_population::predicate () const { - return triggered; + return triggered || config_m.enabled; } void nano::backlog_population::run () { - nano::thread_role::set (nano::thread_role::name::backlog_population); - const auto delay = std::chrono::seconds{ config_m.delay_between_runs_in_seconds }; nano::unique_lock lock{ mutex }; while (!stopped) { - if (predicate () || config_m.ongoing_backlog_population_enabled) + if (predicate ()) { + stats.inc (nano::stat::type::backlog, nano::stat::detail::loop); + triggered = false; lock.unlock (); populate_backlog (); lock.lock (); } - condition.wait_for (lock, delay, [this] () { + condition.wait (lock, [this] () { return stopped || predicate (); }); } @@ -75,22 +76,55 @@ void nano::backlog_population::run () void nano::backlog_population::populate_backlog () { + debug_assert (config_m.frequency > 0); + + const auto chunk_size = config_m.batch_size / config_m.frequency; auto done = false; - uint64_t const chunk_size = 65536; nano::account next = 0; uint64_t total = 0; while (!stopped && !done) { - auto transaction = store_m.tx_begin_read (); - auto count = 0; - auto i = store_m.account.begin (transaction, next); - const auto end = store_m.account.end (); - for (; !stopped && i != end && count < chunk_size; ++i, ++count, ++total) { - auto const & account = i->first; - scheduler.activate (account, transaction); - next = account.number () + 1; + auto transaction = store.tx_begin_read (); + + auto count = 0; + auto i = store.account.begin (transaction, next); + const auto end = store.account.end (); + for (; !stopped && i != end && count < chunk_size; ++i, ++count, ++total) + { + stats.inc (nano::stat::type::backlog, nano::stat::detail::total); + + auto const & account = i->first; + activate (transaction, account); + next = account.number () + 1; + } + done = store.account.begin (transaction, next) == end; } - done = store_m.account.begin (transaction, next) == end; + + // Give the rest of the node time to progress without holding database lock + std::this_thread::sleep_for (std::chrono::milliseconds (1000 / config_m.frequency)); + } +} + +void nano::backlog_population::activate (nano::transaction const & transaction, nano::account const & account) +{ + debug_assert (!activate_callback.empty ()); + + auto const maybe_account_info = store.account.get (transaction, account); + if (!maybe_account_info) + { + return; + } + auto const account_info = *maybe_account_info; + + auto const maybe_conf_info = store.confirmation_height.get (transaction, account); + auto const conf_info = maybe_conf_info.value_or (nano::confirmation_height_info{}); + + // If conf info is empty then it means then it means nothing is confirmed yet + if (conf_info.height < account_info.block_count) + { + stats.inc (nano::stat::type::backlog, nano::stat::detail::activated); + + activate_callback.notify (transaction, account, account_info, conf_info); } } diff --git a/nano/node/backlog_population.hpp b/nano/node/backlog_population.hpp index a4a352dca..33d372d83 100644 --- a/nano/node/backlog_population.hpp +++ b/nano/node/backlog_population.hpp @@ -1,6 +1,9 @@ #pragma once #include +#include +#include +#include #include #include @@ -8,6 +11,7 @@ namespace nano { +class stat; class store; class election_scheduler; @@ -16,43 +20,58 @@ class backlog_population final public: struct config { - bool ongoing_backlog_population_enabled; - unsigned int delay_between_runs_in_seconds; + /** Control if ongoing backlog population is enabled. If not, backlog population can still be triggered by RPC */ + bool enabled; + + /** Number of accounts per second to process. Number of accounts per single batch is this value divided by `frequency` */ + unsigned batch_size; + + /** Number of batches to run per second. Batches run in 1 second / `frequency` intervals */ + unsigned frequency; }; - explicit backlog_population (const config & config_a, store & store, election_scheduler & scheduler); + backlog_population (const config &, nano::store &, nano::stat &); ~backlog_population (); void start (); void stop (); + + /** Manually trigger backlog population */ void trigger (); - /** Other components call this to notify us about external changes, so we can check our predicate. */ + /** Notify about AEC vacancy */ void notify (); +public: + /** + * Callback called for each backlogged account + */ + using callback_t = nano::observer_set; + callback_t activate_callback; + +private: // Dependencies + nano::store & store; + nano::stat & stats; + + config config_m; + private: void run (); bool predicate () const; void populate_backlog (); + void activate (nano::transaction const &, nano::account const &); /** This is a manual trigger, the ongoing backlog population does not use this. * It can be triggered even when backlog population (frontiers confirmation) is disabled. */ bool triggered{ false }; std::atomic stopped{ false }; - nano::condition_variable condition; mutable nano::mutex mutex; /** Thread that runs the backlog implementation logic. The thread always runs, even if * backlog population is disabled, so that it can service a manual trigger (e.g. via RPC). */ std::thread thread; - - config config_m; - -private: // Dependencies - store & store_m; - election_scheduler & scheduler; }; } diff --git a/nano/node/election_scheduler.cpp b/nano/node/election_scheduler.cpp index f9d825888..65f23299e 100644 --- a/nano/node/election_scheduler.cpp +++ b/nano/node/election_scheduler.cpp @@ -21,7 +21,7 @@ void nano::election_scheduler::manual (std::shared_ptr const & bloc notify (); } -void nano::election_scheduler::activate (nano::account const & account_a, nano::transaction const & transaction) +bool nano::election_scheduler::activate (nano::account const & account_a, nano::transaction const & transaction) { debug_assert (!account_a.is_zero ()); nano::account_info account_info; @@ -42,9 +42,11 @@ void nano::election_scheduler::activate (nano::account const & account_a, nano:: nano::lock_guard lock{ mutex }; priority.push (account_info.modified, block, std::max (balance, previous_balance)); notify (); + return true; // Activated } } } + return false; // Not activated } void nano::election_scheduler::stop () diff --git a/nano/node/election_scheduler.hpp b/nano/node/election_scheduler.hpp index bad972277..82eafdf42 100644 --- a/nano/node/election_scheduler.hpp +++ b/nano/node/election_scheduler.hpp @@ -23,8 +23,11 @@ public: // Manualy start an election for a block // Call action with confirmed block, may be different than what we started with void manual (std::shared_ptr const &, boost::optional const & = boost::none, nano::election_behavior = nano::election_behavior::normal, std::function const &)> const & = nullptr); - // Activates the first unconfirmed block of \p account_a - void activate (nano::account const &, nano::transaction const &); + /** + * Activates the first unconfirmed block of \p account_a + * @return true if account was activated + */ + bool activate (nano::account const &, nano::transaction const &); void stop (); // Blocks until no more elections can be activated or there are no more elections to activate void flush (); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 78d150e64..451619608 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -33,11 +33,12 @@ extern std::size_t nano_bootstrap_weights_beta_size; * Configs */ -nano::backlog_population::config nano::nodeconfig_to_backlog_population_config (const nano::node_config & config) +nano::backlog_population::config nano::backlog_population_config (const nano::node_config & config) { - nano::backlog_population::config cfg; - cfg.ongoing_backlog_population_enabled = config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled; - cfg.delay_between_runs_in_seconds = config.network_params.network.is_dev_network () ? 1u : 300u; + nano::backlog_population::config cfg{}; + cfg.enabled = config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled; + cfg.frequency = config.backlog_scan_frequency; + cfg.batch_size = config.backlog_scan_batch_size; return cfg; } @@ -198,7 +199,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co hinting{ nano::nodeconfig_to_hinted_scheduler_config (config), *this, inactive_vote_cache, active, online_reps, stats }, aggregator (config, stats, generator, final_generator, history, ledger, wallets, active), wallets (wallets_store.init_error (), *this), - backlog{ nano::nodeconfig_to_backlog_population_config (config), store, scheduler }, + backlog{ nano::backlog_population_config (config), store, stats }, startup_time (std::chrono::steady_clock::now ()), node_seq (seq) { @@ -211,6 +212,10 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co return ledger.weight (rep); }; + backlog.activate_callback.add ([this] (nano::transaction const & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info) { + scheduler.activate (account, transaction); + }); + if (!init_error ()) { telemetry->start (); @@ -776,6 +781,7 @@ void nano::node::stop () // Cancels ongoing work generation tasks, which may be blocking other threads // No tasks may wait for work generation in I/O threads, or termination signal capturing will be unable to call node::stop() distributed_work.stop (); + backlog.stop (); unchecked.stop (); block_processor.stop (); aggregator.stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 018629ed0..4f204bf04 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -58,7 +58,7 @@ class work_pool; std::unique_ptr collect_container_info (rep_crawler & rep_crawler, std::string const & name); // Configs -backlog_population::config nodeconfig_to_backlog_population_config (node_config const &); +backlog_population::config backlog_population_config (node_config const &); vote_cache::config nodeconfig_to_vote_cache_config (node_config const &, node_flags const &); hinted_scheduler::config nodeconfig_to_hinted_scheduler_config (node_config const &); outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &); diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index eb2d88290..7844d23f3 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -128,6 +128,8 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const toml.put ("frontiers_confirmation", serialize_frontiers_confirmation (frontiers_confirmation), "Mode controlling frontier confirmation rate.\ntype:string,{auto,always,disabled}"); toml.put ("max_queued_requests", max_queued_requests, "Limit for number of queued confirmation requests for one channel, after which new requests are dropped until the queue drops below this value.\ntype:uint32"); toml.put ("rep_crawler_weight_minimum", rep_crawler_weight_minimum.to_string_dec (), "Rep crawler minimum weight, if this is less than minimum principal weight then this is taken as the minimum weight a rep must have to be tracked. If you want to track all reps set this to 0. If you do not want this to influence anything then set it to max value. This is only useful for debugging or for people who really know what they are doing.\ntype:string,amount,raw"); + toml.put ("backlog_scan_batch_size", backlog_scan_batch_size, "Number of accounts per second to process when doing backlog population scan. Increasing this value will help unconfirmed frontiers get into election prioritization queue faster, however it will also increase resource usage. \ntype:uint"); + toml.put ("backlog_scan_frequency", backlog_scan_frequency, "Backlog scan divides the scan into smaller batches, number of which is controlled by this value. Higher frequency helps to utilize resources more uniformly, however it also introduces more overhead. The resulting number of accounts per single batch is `backlog_scan_batch_size / backlog_scan_frequency` \ntype:uint"); auto work_peers_l (toml.create_array ("work_peers", "A list of \"address:port\" entries to identify work peers.")); for (auto i (work_peers.begin ()), n (work_peers.end ()); i != n; ++i) @@ -398,6 +400,9 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) frontiers_confirmation = deserialize_frontiers_confirmation (frontiers_confirmation_l); } + toml.get ("backlog_scan_batch_size", backlog_scan_batch_size); + toml.get ("backlog_scan_frequency", backlog_scan_frequency); + if (toml.has_key ("experimental")) { auto experimental_config_l (toml.get_required_child ("experimental")); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 683a62072..fca0c68aa 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -106,6 +106,12 @@ public: nano::rocksdb_config rocksdb_config; nano::lmdb_config lmdb_config; nano::frontiers_confirmation_mode frontiers_confirmation{ nano::frontiers_confirmation_mode::automatic }; + /** Number of accounts per second to process when doing backlog population scan */ + unsigned backlog_scan_batch_size{ 10 * 1000 }; + /** Number of times per second to run backlog population batches. Number of accounts per single batch is `backlog_scan_batch_size / backlog_scan_frequency` */ + unsigned backlog_scan_frequency{ 10 }; + +public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; nano::frontiers_confirmation_mode deserialize_frontiers_confirmation (std::string const &); /** Entry is ignored if it cannot be parsed as a valid address:port */