Use batch event for background backlog scan
This commit is contained in:
parent
7ae7c8a420
commit
bc77ff9a8d
5 changed files with 86 additions and 62 deletions
|
|
@ -22,9 +22,12 @@ TEST (backlog, population)
|
||||||
nano::test::system system{};
|
nano::test::system system{};
|
||||||
auto & node = *system.add_node ();
|
auto & node = *system.add_node ();
|
||||||
|
|
||||||
node.backlog_scan.activated.add ([&] (nano::secure::transaction const & transaction, auto const & info) {
|
node.backlog_scan.batch_activated.add ([&] (auto const & batch) {
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
activated.insert (info.account);
|
for (auto const & info : batch)
|
||||||
|
{
|
||||||
|
activated.insert (info.account);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
auto blocks = nano::test::setup_independent_blocks (system, node, 256);
|
auto blocks = nano::test::setup_independent_blocks (system, node, 256);
|
||||||
|
|
|
||||||
|
|
@ -8,10 +8,11 @@
|
||||||
#include <nano/store/component.hpp>
|
#include <nano/store/component.hpp>
|
||||||
#include <nano/store/confirmation_height.hpp>
|
#include <nano/store/confirmation_height.hpp>
|
||||||
|
|
||||||
nano::backlog_scan::backlog_scan (backlog_scan_config const & config_a, nano::ledger & ledger, nano::stats & stats_a) :
|
nano::backlog_scan::backlog_scan (backlog_scan_config const & config_a, nano::ledger & ledger_a, nano::stats & stats_a) :
|
||||||
config{ config_a },
|
config{ config_a },
|
||||||
ledger{ ledger },
|
ledger{ ledger_a },
|
||||||
stats{ stats_a }
|
stats{ stats_a },
|
||||||
|
limiter{ config.batch_size * config.frequency }
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -69,76 +70,85 @@ void nano::backlog_scan::run ()
|
||||||
{
|
{
|
||||||
stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::loop);
|
stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::loop);
|
||||||
triggered = false;
|
triggered = false;
|
||||||
populate_backlog (lock);
|
populate_backlog (lock); // Does a single iteration over all accounts
|
||||||
|
debug_assert (lock.owns_lock ());
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
condition.wait (lock, [this] () {
|
||||||
|
return stopped || predicate ();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
condition.wait (lock, [this] () {
|
|
||||||
return stopped || predicate ();
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::backlog_scan::populate_backlog (nano::unique_lock<nano::mutex> & lock)
|
void nano::backlog_scan::populate_backlog (nano::unique_lock<nano::mutex> & lock)
|
||||||
{
|
{
|
||||||
debug_assert (config.frequency > 0);
|
|
||||||
|
|
||||||
const auto chunk_size = config.batch_size / config.frequency;
|
|
||||||
auto done = false;
|
|
||||||
nano::account next = 0;
|
|
||||||
uint64_t total = 0;
|
uint64_t total = 0;
|
||||||
|
|
||||||
|
nano::account next = 0;
|
||||||
|
bool done = false;
|
||||||
while (!stopped && !done)
|
while (!stopped && !done)
|
||||||
{
|
{
|
||||||
|
// Wait for the rate limiter
|
||||||
|
while (!limiter.should_pass (config.batch_size))
|
||||||
|
{
|
||||||
|
condition.wait_for (lock, std::chrono::milliseconds{ 1000 / config.frequency / 2 });
|
||||||
|
if (stopped)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
lock.unlock ();
|
lock.unlock ();
|
||||||
|
|
||||||
|
std::deque<activated_info> scanned;
|
||||||
|
std::deque<activated_info> activated;
|
||||||
{
|
{
|
||||||
auto transaction = ledger.tx_begin_read ();
|
auto transaction = ledger.tx_begin_read ();
|
||||||
|
|
||||||
auto it = ledger.store.account.begin (transaction, next);
|
auto it = ledger.store.account.begin (transaction, next);
|
||||||
auto const end = ledger.store.account.end (transaction);
|
auto const end = ledger.store.account.end (transaction);
|
||||||
|
|
||||||
auto should_refresh = [&transaction] () {
|
for (size_t count = 0; it != end && count < config.batch_size; ++it, ++count, ++total)
|
||||||
auto cutoff = std::chrono::steady_clock::now () - 100ms; // TODO: Make this configurable
|
|
||||||
return transaction.timestamp () < cutoff;
|
|
||||||
};
|
|
||||||
|
|
||||||
for (size_t count = 0; it != end && count < chunk_size && !should_refresh (); ++it, ++count, ++total)
|
|
||||||
{
|
{
|
||||||
stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::total);
|
stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::total);
|
||||||
|
|
||||||
auto const & account = it->first;
|
auto const [account, account_info] = *it;
|
||||||
auto const & account_info = it->second;
|
auto const maybe_conf_info = ledger.store.confirmation_height.get (transaction, account);
|
||||||
|
auto const conf_info = maybe_conf_info.value_or (nano::confirmation_height_info{});
|
||||||
|
|
||||||
activate (transaction, account, account_info);
|
activated_info info{ account, account_info, conf_info };
|
||||||
|
|
||||||
next = account.number () + 1;
|
scanned.push_back (info);
|
||||||
|
if (conf_info.height < account_info.block_count)
|
||||||
|
{
|
||||||
|
activated.push_back (info);
|
||||||
|
}
|
||||||
|
|
||||||
|
next = account.number () + 1; // TODO: Prevent account overflow
|
||||||
}
|
}
|
||||||
|
|
||||||
done = ledger.store.account.begin (transaction, next) == end;
|
done = (it == end);
|
||||||
}
|
}
|
||||||
|
|
||||||
lock.lock ();
|
stats.add (nano::stat::type::backlog_scan, nano::stat::detail::scanned, scanned.size ());
|
||||||
|
stats.add (nano::stat::type::backlog_scan, nano::stat::detail::activated, activated.size ());
|
||||||
|
|
||||||
// Give the rest of the node time to progress without holding database lock
|
// Notify about scanned and activated accounts without holding database transaction
|
||||||
condition.wait_for (lock, std::chrono::milliseconds{ 1000 / config.frequency });
|
batch_scanned.notify (scanned);
|
||||||
|
batch_activated.notify (activated);
|
||||||
|
|
||||||
|
lock.lock ();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::backlog_scan::activate (secure::transaction const & transaction, nano::account const & account, nano::account_info const & account_info)
|
nano::container_info nano::backlog_scan::container_info () const
|
||||||
{
|
{
|
||||||
auto const maybe_conf_info = ledger.store.confirmation_height.get (transaction, account);
|
nano::lock_guard<nano::mutex> guard{ mutex };
|
||||||
auto const conf_info = maybe_conf_info.value_or (nano::confirmation_height_info{});
|
nano::container_info info;
|
||||||
|
info.put ("limiter", limiter.size ());
|
||||||
activated_info info{ account, account_info, conf_info };
|
return info;
|
||||||
|
|
||||||
stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::scanned);
|
|
||||||
scanned.notify (transaction, info);
|
|
||||||
|
|
||||||
// If conf info is empty then it means then it means nothing is confirmed yet
|
|
||||||
if (conf_info.height < account_info.block_count)
|
|
||||||
{
|
|
||||||
stats.inc (nano::stat::type::backlog_scan, nano::stat::detail::activated);
|
|
||||||
activated.notify (transaction, info);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -149,7 +159,7 @@ nano::error nano::backlog_scan_config::serialize (nano::tomlconfig & toml) const
|
||||||
{
|
{
|
||||||
toml.put ("enable", enable, "Control if ongoing backlog population is enabled. If not, backlog population can still be triggered by RPC \ntype:bool");
|
toml.put ("enable", enable, "Control if ongoing backlog population is enabled. If not, backlog population can still be triggered by RPC \ntype:bool");
|
||||||
toml.put ("batch_size", batch_size, "Number of accounts per second to process when doing backlog population scan. Increasing this value will help unconfirmed frontiers get into election prioritization queue faster, however it will also increase resource usage. \ntype:uint");
|
toml.put ("batch_size", batch_size, "Number of accounts per second to process when doing backlog population scan. Increasing this value will help unconfirmed frontiers get into election prioritization queue faster, however it will also increase resource usage. \ntype:uint");
|
||||||
toml.put ("frequency", frequency, "Backlog scan divides the scan into smaller batches, number of which is controlled by this value. Higher frequency helps to utilize resources more uniformly, however it also introduces more overhead. The resulting number of accounts per single batch is `backlog_scan_batch_size / backlog_scan_frequency` \ntype:uint");
|
toml.put ("frequency", frequency, "Number of batches to process per second. Higher frequency and smaller batch size helps to utilize resources more uniformly, however it also introduces more overhead. Use 0 to process as fast as possible, but be aware that it may consume a lot of resources. \ntype:uint");
|
||||||
|
|
||||||
return toml.get_error ();
|
return toml.get_error ();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,13 @@
|
||||||
#include <nano/lib/locks.hpp>
|
#include <nano/lib/locks.hpp>
|
||||||
#include <nano/lib/numbers.hpp>
|
#include <nano/lib/numbers.hpp>
|
||||||
#include <nano/lib/observer_set.hpp>
|
#include <nano/lib/observer_set.hpp>
|
||||||
|
#include <nano/lib/rate_limiting.hpp>
|
||||||
#include <nano/node/fwd.hpp>
|
#include <nano/node/fwd.hpp>
|
||||||
#include <nano/secure/account_info.hpp>
|
#include <nano/secure/account_info.hpp>
|
||||||
#include <nano/secure/common.hpp>
|
#include <nano/secure/common.hpp>
|
||||||
|
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
#include <deque>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
namespace nano
|
namespace nano
|
||||||
|
|
@ -21,10 +23,10 @@ public:
|
||||||
public:
|
public:
|
||||||
/** Control if ongoing backlog population is enabled. If not, backlog population can still be triggered by RPC */
|
/** Control if ongoing backlog population is enabled. If not, backlog population can still be triggered by RPC */
|
||||||
bool enable{ true };
|
bool enable{ true };
|
||||||
/** Number of accounts per second to process. Number of accounts per single batch is this value divided by `frequency` */
|
/** Number of accounts per second to process. */
|
||||||
unsigned batch_size{ 10 * 1000 };
|
size_t batch_size{ 1000 };
|
||||||
/** Number of batches to run per second. Batches run in 1 second / `frequency` intervals */
|
/** Number of batches to run per second. */
|
||||||
unsigned frequency{ 10 };
|
size_t frequency{ 10 };
|
||||||
};
|
};
|
||||||
|
|
||||||
class backlog_scan final
|
class backlog_scan final
|
||||||
|
|
@ -42,6 +44,8 @@ public:
|
||||||
/** Notify about AEC vacancy */
|
/** Notify about AEC vacancy */
|
||||||
void notify ();
|
void notify ();
|
||||||
|
|
||||||
|
nano::container_info container_info () const;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
struct activated_info
|
struct activated_info
|
||||||
{
|
{
|
||||||
|
|
@ -50,12 +54,9 @@ public:
|
||||||
nano::confirmation_height_info conf_info;
|
nano::confirmation_height_info conf_info;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
using batch_event_t = nano::observer_set<std::deque<activated_info>>;
|
||||||
* Callback called for each backlogged account
|
batch_event_t batch_scanned; // Accounts scanned but not activated
|
||||||
*/
|
batch_event_t batch_activated; // Accounts activated
|
||||||
using callback_t = nano::observer_set<nano::secure::transaction const &, activated_info const &>;
|
|
||||||
callback_t activated;
|
|
||||||
callback_t scanned;
|
|
||||||
|
|
||||||
private: // Dependencies
|
private: // Dependencies
|
||||||
backlog_scan_config const & config;
|
backlog_scan_config const & config;
|
||||||
|
|
@ -66,9 +67,10 @@ private:
|
||||||
void run ();
|
void run ();
|
||||||
bool predicate () const;
|
bool predicate () const;
|
||||||
void populate_backlog (nano::unique_lock<nano::mutex> & lock);
|
void populate_backlog (nano::unique_lock<nano::mutex> & lock);
|
||||||
void activate (secure::transaction const &, nano::account const &, nano::account_info const &);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
nano::rate_limiter limiter;
|
||||||
|
|
||||||
/** This is a manual trigger, the ongoing backlog population does not use this.
|
/** This is a manual trigger, the ongoing backlog population does not use this.
|
||||||
* It can be triggered even when backlog population (frontiers confirmation) is disabled. */
|
* It can be triggered even when backlog population (frontiers confirmation) is disabled. */
|
||||||
bool triggered{ false };
|
bool triggered{ false };
|
||||||
|
|
|
||||||
|
|
@ -180,9 +180,14 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
|
||||||
return ledger.weight (rep);
|
return ledger.weight (rep);
|
||||||
};
|
};
|
||||||
|
|
||||||
backlog_scan.activated.add ([this] (nano::secure::transaction const & transaction, auto const & info) {
|
// TODO: Hook this direclty in the schedulers
|
||||||
scheduler.optimistic.activate (info.account, info.account_info, info.conf_info);
|
backlog_scan.batch_activated.add ([this] (auto const & batch) {
|
||||||
scheduler.priority.activate (transaction, info.account, info.account_info, info.conf_info);
|
auto transaction = ledger.tx_begin_read ();
|
||||||
|
for (auto const & info : batch)
|
||||||
|
{
|
||||||
|
scheduler.optimistic.activate (info.account, info.account_info, info.conf_info);
|
||||||
|
scheduler.priority.activate (transaction, info.account, info.account_info, info.conf_info);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Republish vote if it is new and the node does not host a principal representative (or close to)
|
// Republish vote if it is new and the node does not host a principal representative (or close to)
|
||||||
|
|
@ -1198,6 +1203,7 @@ nano::container_info nano::node::container_info () const
|
||||||
info.add ("rep_tiers", rep_tiers.container_info ());
|
info.add ("rep_tiers", rep_tiers.container_info ());
|
||||||
info.add ("message_processor", message_processor.container_info ());
|
info.add ("message_processor", message_processor.container_info ());
|
||||||
info.add ("bandwidth", outbound_limiter.container_info ());
|
info.add ("bandwidth", outbound_limiter.container_info ());
|
||||||
|
info.add ("backlog_scan", backlog_scan.container_info ());
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -135,9 +135,12 @@ bool nano::scheduler::priority::activate (secure::transaction const & transactio
|
||||||
{
|
{
|
||||||
debug_assert (conf_info.frontier != account_info.head);
|
debug_assert (conf_info.frontier != account_info.head);
|
||||||
|
|
||||||
auto hash = conf_info.height == 0 ? account_info.open_block : ledger.any.block_successor (transaction, conf_info.frontier).value ();
|
auto const hash = conf_info.height == 0 ? account_info.open_block : ledger.any.block_successor (transaction, conf_info.frontier).value_or (0);
|
||||||
auto block = ledger.any.block_get (transaction, hash);
|
auto const block = ledger.any.block_get (transaction, hash);
|
||||||
release_assert (block != nullptr);
|
if (!block)
|
||||||
|
{
|
||||||
|
return false; // Not activated
|
||||||
|
}
|
||||||
|
|
||||||
if (ledger.dependents_confirmed (transaction, *block))
|
if (ledger.dependents_confirmed (transaction, *block))
|
||||||
{
|
{
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue