Continuous backlog population (#3999)
This PR fixes that by modifying the way the ledger scan is done. Instead of 5 minute interval, we run the scan all the time (unless disabled by setting frontiers_confirmation = disabled node config setting), but we throttle the rate at which the scan is done to limit consumption of node resources. The rate and frequency is controlled by two new node-config.toml settings: backlog_scan_batch_size and backlog_scan_frequency. https://github.com/nanocurrency/nano-node/pull/3999
This commit is contained in:
parent
9be0f0aeb9
commit
02bffc2d7a
14 changed files with 259 additions and 50 deletions
|
|
@ -4,6 +4,7 @@ add_executable(
|
||||||
fakes/websocket_client.hpp
|
fakes/websocket_client.hpp
|
||||||
fakes/work_peer.hpp
|
fakes/work_peer.hpp
|
||||||
active_transactions.cpp
|
active_transactions.cpp
|
||||||
|
backlog.cpp
|
||||||
block.cpp
|
block.cpp
|
||||||
block_store.cpp
|
block_store.cpp
|
||||||
blockprocessor.cpp
|
blockprocessor.cpp
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,10 @@ TEST (active_transactions, confirm_election_by_request)
|
||||||
// Ensure election on node1 is already confirmed before connecting with node2
|
// Ensure election on node1 is already confirmed before connecting with node2
|
||||||
ASSERT_TIMELY (5s, nano::test::confirmed (node1, { send1 }));
|
ASSERT_TIMELY (5s, nano::test::confirmed (node1, { send1 }));
|
||||||
|
|
||||||
|
// Wait for the election to be removed and give time for any in-flight vote broadcasts to settle
|
||||||
|
ASSERT_TIMELY (5s, node1.active.empty ());
|
||||||
|
WAIT (1s);
|
||||||
|
|
||||||
// At this point node1 should not generate votes for send1 block unless it receives a request
|
// At this point node1 should not generate votes for send1 block unless it receives a request
|
||||||
|
|
||||||
// Create a second node
|
// Create a second node
|
||||||
|
|
|
||||||
107
nano/core_test/backlog.cpp
Normal file
107
nano/core_test/backlog.cpp
Normal file
|
|
@ -0,0 +1,107 @@
|
||||||
|
#include <nano/node/active_transactions.hpp>
|
||||||
|
#include <nano/test_common/system.hpp>
|
||||||
|
#include <nano/test_common/testutil.hpp>
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <numeric>
|
||||||
|
|
||||||
|
using namespace std::chrono_literals;
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
using block_list_t = std::vector<std::shared_ptr<nano::block>>;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Creates `count` 1 raw sends from genesis to unique accounts and corresponding open blocks.
|
||||||
|
* The genesis chain is then confirmed, but leaves open blocks unconfirmed
|
||||||
|
* The list of unconfirmed open blocks is returned.
|
||||||
|
*/
|
||||||
|
block_list_t setup_independent_blocks (nano::test::system & system, nano::node & node, int count)
|
||||||
|
{
|
||||||
|
std::vector<std::shared_ptr<nano::block>> blocks;
|
||||||
|
|
||||||
|
auto latest = node.latest (nano::dev::genesis_key.pub);
|
||||||
|
auto balance = node.balance (nano::dev::genesis_key.pub);
|
||||||
|
|
||||||
|
for (int n = 0; n < count; ++n)
|
||||||
|
{
|
||||||
|
nano::keypair key;
|
||||||
|
nano::block_builder builder;
|
||||||
|
|
||||||
|
balance -= 1;
|
||||||
|
auto send = builder
|
||||||
|
.state ()
|
||||||
|
.account (nano::dev::genesis_key.pub)
|
||||||
|
.previous (latest)
|
||||||
|
.representative (nano::dev::genesis_key.pub)
|
||||||
|
.balance (balance)
|
||||||
|
.link (key.pub)
|
||||||
|
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
|
||||||
|
.work (*system.work.generate (latest))
|
||||||
|
.build_shared ();
|
||||||
|
latest = send->hash ();
|
||||||
|
|
||||||
|
auto open = builder
|
||||||
|
.state ()
|
||||||
|
.account (key.pub)
|
||||||
|
.previous (0)
|
||||||
|
.representative (key.pub)
|
||||||
|
.balance (1)
|
||||||
|
.link (send->hash ())
|
||||||
|
.sign (key.prv, key.pub)
|
||||||
|
.work (*system.work.generate (key.pub))
|
||||||
|
.build_shared ();
|
||||||
|
|
||||||
|
EXPECT_TRUE (nano::test::process (node, { send, open }));
|
||||||
|
EXPECT_TIMELY (5s, nano::test::exists (node, { send, open })); // Ensure blocks are in the ledger
|
||||||
|
|
||||||
|
blocks.push_back (open);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Confirm whole genesis chain at once
|
||||||
|
EXPECT_TIMELY (5s, nano::test::confirm (node, { latest }));
|
||||||
|
EXPECT_TIMELY (5s, nano::test::confirmed (node, { latest }));
|
||||||
|
|
||||||
|
return blocks;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Ensures all not confirmed accounts get activated by backlog scan periodically
|
||||||
|
*/
|
||||||
|
TEST (backlog, population)
|
||||||
|
{
|
||||||
|
nano::test::system system{};
|
||||||
|
auto & node = *system.add_node ();
|
||||||
|
|
||||||
|
nano::mutex mutex;
|
||||||
|
std::unordered_set<nano::account> activated;
|
||||||
|
|
||||||
|
node.backlog.activate_callback.add ([&] (nano::transaction const & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info) {
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
|
||||||
|
activated.insert (account);
|
||||||
|
});
|
||||||
|
|
||||||
|
auto blocks = setup_independent_blocks (system, node, 256);
|
||||||
|
|
||||||
|
// Checks if `activated` set contains all accounts we previously set up
|
||||||
|
auto all_activated = [&] () {
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
return std::all_of (blocks.begin (), blocks.end (), [&] (auto const & item) {
|
||||||
|
auto account = item->account ();
|
||||||
|
debug_assert (!account.is_zero ());
|
||||||
|
return activated.count (account) != 0;
|
||||||
|
});
|
||||||
|
};
|
||||||
|
ASSERT_TIMELY (5s, all_activated ());
|
||||||
|
|
||||||
|
// Clear activated set to ensure we activate those accounts more than once
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
|
activated.clear ();
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_TIMELY (5s, all_activated ());
|
||||||
|
}
|
||||||
|
|
@ -190,6 +190,8 @@ TEST (toml, daemon_config_deserialize_defaults)
|
||||||
ASSERT_EQ (conf.node.work_peers, defaults.node.work_peers);
|
ASSERT_EQ (conf.node.work_peers, defaults.node.work_peers);
|
||||||
ASSERT_EQ (conf.node.work_threads, defaults.node.work_threads);
|
ASSERT_EQ (conf.node.work_threads, defaults.node.work_threads);
|
||||||
ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests);
|
ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests);
|
||||||
|
ASSERT_EQ (conf.node.backlog_scan_batch_size, defaults.node.backlog_scan_batch_size);
|
||||||
|
ASSERT_EQ (conf.node.backlog_scan_frequency, defaults.node.backlog_scan_frequency);
|
||||||
|
|
||||||
ASSERT_EQ (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value);
|
ASSERT_EQ (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value);
|
||||||
ASSERT_EQ (conf.node.logging.flush, defaults.node.logging.flush);
|
ASSERT_EQ (conf.node.logging.flush, defaults.node.logging.flush);
|
||||||
|
|
@ -433,6 +435,9 @@ TEST (toml, daemon_config_deserialize_no_defaults)
|
||||||
max_work_generate_multiplier = 1.0
|
max_work_generate_multiplier = 1.0
|
||||||
max_queued_requests = 999
|
max_queued_requests = 999
|
||||||
frontiers_confirmation = "always"
|
frontiers_confirmation = "always"
|
||||||
|
backlog_scan_batch_size = 999
|
||||||
|
backlog_scan_frequency = 999
|
||||||
|
|
||||||
[node.diagnostics.txn_tracking]
|
[node.diagnostics.txn_tracking]
|
||||||
enable = true
|
enable = true
|
||||||
ignore_writes_below_block_processor_max_time = false
|
ignore_writes_below_block_processor_max_time = false
|
||||||
|
|
@ -602,6 +607,8 @@ TEST (toml, daemon_config_deserialize_no_defaults)
|
||||||
ASSERT_NE (conf.node.work_peers, defaults.node.work_peers);
|
ASSERT_NE (conf.node.work_peers, defaults.node.work_peers);
|
||||||
ASSERT_NE (conf.node.work_threads, defaults.node.work_threads);
|
ASSERT_NE (conf.node.work_threads, defaults.node.work_threads);
|
||||||
ASSERT_NE (conf.node.max_queued_requests, defaults.node.max_queued_requests);
|
ASSERT_NE (conf.node.max_queued_requests, defaults.node.max_queued_requests);
|
||||||
|
ASSERT_NE (conf.node.backlog_scan_batch_size, defaults.node.backlog_scan_batch_size);
|
||||||
|
ASSERT_NE (conf.node.backlog_scan_frequency, defaults.node.backlog_scan_frequency);
|
||||||
|
|
||||||
ASSERT_NE (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value);
|
ASSERT_NE (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value);
|
||||||
ASSERT_NE (conf.node.logging.flush, defaults.node.logging.flush);
|
ASSERT_NE (conf.node.logging.flush, defaults.node.logging.flush);
|
||||||
|
|
|
||||||
|
|
@ -557,6 +557,9 @@ std::string nano::stat::type_to_string (stat::type type)
|
||||||
case nano::stat::type::active:
|
case nano::stat::type::active:
|
||||||
res = "active";
|
res = "active";
|
||||||
break;
|
break;
|
||||||
|
case nano::stat::type::backlog:
|
||||||
|
res = "backlog";
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
@ -572,6 +575,9 @@ std::string nano::stat::detail_to_string (stat::detail detail)
|
||||||
case nano::stat::detail::loop:
|
case nano::stat::detail::loop:
|
||||||
res = "loop";
|
res = "loop";
|
||||||
break;
|
break;
|
||||||
|
case nano::stat::detail::total:
|
||||||
|
res = "total";
|
||||||
|
break;
|
||||||
case nano::stat::detail::queue:
|
case nano::stat::detail::queue:
|
||||||
res = "queue";
|
res = "queue";
|
||||||
break;
|
break;
|
||||||
|
|
@ -811,6 +817,7 @@ std::string nano::stat::detail_to_string (stat::detail detail)
|
||||||
break;
|
break;
|
||||||
case nano::stat::detail::election_not_confirmed:
|
case nano::stat::detail::election_not_confirmed:
|
||||||
res = "election_not_confirmed";
|
res = "election_not_confirmed";
|
||||||
|
break;
|
||||||
case nano::stat::detail::election_hinted_overflow:
|
case nano::stat::detail::election_hinted_overflow:
|
||||||
res = "election_hinted_overflow";
|
res = "election_hinted_overflow";
|
||||||
break;
|
break;
|
||||||
|
|
@ -1045,6 +1052,9 @@ std::string nano::stat::detail_to_string (stat::detail detail)
|
||||||
case nano::stat::detail::channel_full:
|
case nano::stat::detail::channel_full:
|
||||||
res = "channel_full";
|
res = "channel_full";
|
||||||
break;
|
break;
|
||||||
|
case nano::stat::detail::activated:
|
||||||
|
res = "activated";
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -249,6 +249,7 @@ public:
|
||||||
blockprocessor,
|
blockprocessor,
|
||||||
bootstrap_server,
|
bootstrap_server,
|
||||||
active,
|
active,
|
||||||
|
backlog,
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Optional detail type */
|
/** Optional detail type */
|
||||||
|
|
@ -258,6 +259,7 @@ public:
|
||||||
|
|
||||||
// common
|
// common
|
||||||
loop,
|
loop,
|
||||||
|
total,
|
||||||
|
|
||||||
// processing queue
|
// processing queue
|
||||||
queue,
|
queue,
|
||||||
|
|
@ -458,6 +460,9 @@ public:
|
||||||
response_blocks,
|
response_blocks,
|
||||||
response_account_info,
|
response_account_info,
|
||||||
channel_full,
|
channel_full,
|
||||||
|
|
||||||
|
// backlog
|
||||||
|
activated,
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Direction of the stat. If the direction is irrelevant, use in */
|
/** Direction of the stat. If the direction is irrelevant, use in */
|
||||||
|
|
|
||||||
|
|
@ -4,41 +4,42 @@
|
||||||
#include <nano/node/nodeconfig.hpp>
|
#include <nano/node/nodeconfig.hpp>
|
||||||
#include <nano/secure/store.hpp>
|
#include <nano/secure/store.hpp>
|
||||||
|
|
||||||
nano::backlog_population::backlog_population (const config & config_a, nano::store & store_a, nano::election_scheduler & scheduler_a) :
|
nano::backlog_population::backlog_population (const config & config_a, nano::store & store_a, nano::stat & stats_a) :
|
||||||
config_m{ config_a },
|
config_m{ config_a },
|
||||||
store_m{ store_a },
|
store{ store_a },
|
||||||
scheduler{ scheduler_a }
|
stats{ stats_a }
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
nano::backlog_population::~backlog_population ()
|
nano::backlog_population::~backlog_population ()
|
||||||
{
|
{
|
||||||
stop ();
|
// Thread must be stopped before destruction
|
||||||
if (thread.joinable ())
|
debug_assert (!thread.joinable ());
|
||||||
{
|
|
||||||
thread.join ();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::backlog_population::start ()
|
void nano::backlog_population::start ()
|
||||||
{
|
{
|
||||||
if (!thread.joinable ())
|
debug_assert (!thread.joinable ());
|
||||||
{
|
|
||||||
thread = std::thread{ [this] () { run (); } };
|
thread = std::thread{ [this] () {
|
||||||
}
|
nano::thread_role::set (nano::thread_role::name::backlog_population);
|
||||||
|
run ();
|
||||||
|
} };
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::backlog_population::stop ()
|
void nano::backlog_population::stop ()
|
||||||
{
|
{
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
||||||
stopped = true;
|
stopped = true;
|
||||||
notify ();
|
notify ();
|
||||||
|
nano::join_or_pass (thread);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::backlog_population::trigger ()
|
void nano::backlog_population::trigger ()
|
||||||
{
|
{
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
{
|
||||||
triggered = true;
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
|
triggered = true;
|
||||||
|
}
|
||||||
notify ();
|
notify ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -49,25 +50,25 @@ void nano::backlog_population::notify ()
|
||||||
|
|
||||||
bool nano::backlog_population::predicate () const
|
bool nano::backlog_population::predicate () const
|
||||||
{
|
{
|
||||||
return triggered;
|
return triggered || config_m.enabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::backlog_population::run ()
|
void nano::backlog_population::run ()
|
||||||
{
|
{
|
||||||
nano::thread_role::set (nano::thread_role::name::backlog_population);
|
|
||||||
const auto delay = std::chrono::seconds{ config_m.delay_between_runs_in_seconds };
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
while (!stopped)
|
while (!stopped)
|
||||||
{
|
{
|
||||||
if (predicate () || config_m.ongoing_backlog_population_enabled)
|
if (predicate ())
|
||||||
{
|
{
|
||||||
|
stats.inc (nano::stat::type::backlog, nano::stat::detail::loop);
|
||||||
|
|
||||||
triggered = false;
|
triggered = false;
|
||||||
lock.unlock ();
|
lock.unlock ();
|
||||||
populate_backlog ();
|
populate_backlog ();
|
||||||
lock.lock ();
|
lock.lock ();
|
||||||
}
|
}
|
||||||
|
|
||||||
condition.wait_for (lock, delay, [this] () {
|
condition.wait (lock, [this] () {
|
||||||
return stopped || predicate ();
|
return stopped || predicate ();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
@ -75,22 +76,55 @@ void nano::backlog_population::run ()
|
||||||
|
|
||||||
void nano::backlog_population::populate_backlog ()
|
void nano::backlog_population::populate_backlog ()
|
||||||
{
|
{
|
||||||
|
debug_assert (config_m.frequency > 0);
|
||||||
|
|
||||||
|
const auto chunk_size = config_m.batch_size / config_m.frequency;
|
||||||
auto done = false;
|
auto done = false;
|
||||||
uint64_t const chunk_size = 65536;
|
|
||||||
nano::account next = 0;
|
nano::account next = 0;
|
||||||
uint64_t total = 0;
|
uint64_t total = 0;
|
||||||
while (!stopped && !done)
|
while (!stopped && !done)
|
||||||
{
|
{
|
||||||
auto transaction = store_m.tx_begin_read ();
|
|
||||||
auto count = 0;
|
|
||||||
auto i = store_m.account.begin (transaction, next);
|
|
||||||
const auto end = store_m.account.end ();
|
|
||||||
for (; !stopped && i != end && count < chunk_size; ++i, ++count, ++total)
|
|
||||||
{
|
{
|
||||||
auto const & account = i->first;
|
auto transaction = store.tx_begin_read ();
|
||||||
scheduler.activate (account, transaction);
|
|
||||||
next = account.number () + 1;
|
auto count = 0;
|
||||||
|
auto i = store.account.begin (transaction, next);
|
||||||
|
const auto end = store.account.end ();
|
||||||
|
for (; !stopped && i != end && count < chunk_size; ++i, ++count, ++total)
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::backlog, nano::stat::detail::total);
|
||||||
|
|
||||||
|
auto const & account = i->first;
|
||||||
|
activate (transaction, account);
|
||||||
|
next = account.number () + 1;
|
||||||
|
}
|
||||||
|
done = store.account.begin (transaction, next) == end;
|
||||||
}
|
}
|
||||||
done = store_m.account.begin (transaction, next) == end;
|
|
||||||
|
// Give the rest of the node time to progress without holding database lock
|
||||||
|
std::this_thread::sleep_for (std::chrono::milliseconds (1000 / config_m.frequency));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::backlog_population::activate (nano::transaction const & transaction, nano::account const & account)
|
||||||
|
{
|
||||||
|
debug_assert (!activate_callback.empty ());
|
||||||
|
|
||||||
|
auto const maybe_account_info = store.account.get (transaction, account);
|
||||||
|
if (!maybe_account_info)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
auto const account_info = *maybe_account_info;
|
||||||
|
|
||||||
|
auto const maybe_conf_info = store.confirmation_height.get (transaction, account);
|
||||||
|
auto const conf_info = maybe_conf_info.value_or (nano::confirmation_height_info{});
|
||||||
|
|
||||||
|
// If conf info is empty then it means then it means nothing is confirmed yet
|
||||||
|
if (conf_info.height < account_info.block_count)
|
||||||
|
{
|
||||||
|
stats.inc (nano::stat::type::backlog, nano::stat::detail::activated);
|
||||||
|
|
||||||
|
activate_callback.notify (transaction, account, account_info, conf_info);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,9 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <nano/lib/locks.hpp>
|
#include <nano/lib/locks.hpp>
|
||||||
|
#include <nano/lib/numbers.hpp>
|
||||||
|
#include <nano/lib/observer_set.hpp>
|
||||||
|
#include <nano/secure/common.hpp>
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
|
|
@ -8,6 +11,7 @@
|
||||||
|
|
||||||
namespace nano
|
namespace nano
|
||||||
{
|
{
|
||||||
|
class stat;
|
||||||
class store;
|
class store;
|
||||||
class election_scheduler;
|
class election_scheduler;
|
||||||
|
|
||||||
|
|
@ -16,43 +20,58 @@ class backlog_population final
|
||||||
public:
|
public:
|
||||||
struct config
|
struct config
|
||||||
{
|
{
|
||||||
bool ongoing_backlog_population_enabled;
|
/** Control if ongoing backlog population is enabled. If not, backlog population can still be triggered by RPC */
|
||||||
unsigned int delay_between_runs_in_seconds;
|
bool enabled;
|
||||||
|
|
||||||
|
/** Number of accounts per second to process. Number of accounts per single batch is this value divided by `frequency` */
|
||||||
|
unsigned batch_size;
|
||||||
|
|
||||||
|
/** Number of batches to run per second. Batches run in 1 second / `frequency` intervals */
|
||||||
|
unsigned frequency;
|
||||||
};
|
};
|
||||||
|
|
||||||
explicit backlog_population (const config & config_a, store & store, election_scheduler & scheduler);
|
backlog_population (const config &, nano::store &, nano::stat &);
|
||||||
~backlog_population ();
|
~backlog_population ();
|
||||||
|
|
||||||
void start ();
|
void start ();
|
||||||
void stop ();
|
void stop ();
|
||||||
|
|
||||||
|
/** Manually trigger backlog population */
|
||||||
void trigger ();
|
void trigger ();
|
||||||
|
|
||||||
/** Other components call this to notify us about external changes, so we can check our predicate. */
|
/** Notify about AEC vacancy */
|
||||||
void notify ();
|
void notify ();
|
||||||
|
|
||||||
|
public:
|
||||||
|
/**
|
||||||
|
* Callback called for each backlogged account
|
||||||
|
*/
|
||||||
|
using callback_t = nano::observer_set<nano::transaction const &, nano::account const &, nano::account_info const &, nano::confirmation_height_info const &>;
|
||||||
|
callback_t activate_callback;
|
||||||
|
|
||||||
|
private: // Dependencies
|
||||||
|
nano::store & store;
|
||||||
|
nano::stat & stats;
|
||||||
|
|
||||||
|
config config_m;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void run ();
|
void run ();
|
||||||
bool predicate () const;
|
bool predicate () const;
|
||||||
|
|
||||||
void populate_backlog ();
|
void populate_backlog ();
|
||||||
|
void activate (nano::transaction const &, nano::account const &);
|
||||||
|
|
||||||
/** This is a manual trigger, the ongoing backlog population does not use this.
|
/** This is a manual trigger, the ongoing backlog population does not use this.
|
||||||
* It can be triggered even when backlog population (frontiers confirmation) is disabled. */
|
* It can be triggered even when backlog population (frontiers confirmation) is disabled. */
|
||||||
bool triggered{ false };
|
bool triggered{ false };
|
||||||
|
|
||||||
std::atomic<bool> stopped{ false };
|
std::atomic<bool> stopped{ false };
|
||||||
|
|
||||||
nano::condition_variable condition;
|
nano::condition_variable condition;
|
||||||
mutable nano::mutex mutex;
|
mutable nano::mutex mutex;
|
||||||
|
|
||||||
/** Thread that runs the backlog implementation logic. The thread always runs, even if
|
/** Thread that runs the backlog implementation logic. The thread always runs, even if
|
||||||
* backlog population is disabled, so that it can service a manual trigger (e.g. via RPC). */
|
* backlog population is disabled, so that it can service a manual trigger (e.g. via RPC). */
|
||||||
std::thread thread;
|
std::thread thread;
|
||||||
|
|
||||||
config config_m;
|
|
||||||
|
|
||||||
private: // Dependencies
|
|
||||||
store & store_m;
|
|
||||||
election_scheduler & scheduler;
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ void nano::election_scheduler::manual (std::shared_ptr<nano::block> const & bloc
|
||||||
notify ();
|
notify ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::election_scheduler::activate (nano::account const & account_a, nano::transaction const & transaction)
|
bool nano::election_scheduler::activate (nano::account const & account_a, nano::transaction const & transaction)
|
||||||
{
|
{
|
||||||
debug_assert (!account_a.is_zero ());
|
debug_assert (!account_a.is_zero ());
|
||||||
nano::account_info account_info;
|
nano::account_info account_info;
|
||||||
|
|
@ -42,9 +42,11 @@ void nano::election_scheduler::activate (nano::account const & account_a, nano::
|
||||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||||
priority.push (account_info.modified, block, std::max (balance, previous_balance));
|
priority.push (account_info.modified, block, std::max (balance, previous_balance));
|
||||||
notify ();
|
notify ();
|
||||||
|
return true; // Activated
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return false; // Not activated
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::election_scheduler::stop ()
|
void nano::election_scheduler::stop ()
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,11 @@ public:
|
||||||
// Manualy start an election for a block
|
// Manualy start an election for a block
|
||||||
// Call action with confirmed block, may be different than what we started with
|
// Call action with confirmed block, may be different than what we started with
|
||||||
void manual (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal, std::function<void (std::shared_ptr<nano::block> const &)> const & = nullptr);
|
void manual (std::shared_ptr<nano::block> const &, boost::optional<nano::uint128_t> const & = boost::none, nano::election_behavior = nano::election_behavior::normal, std::function<void (std::shared_ptr<nano::block> const &)> const & = nullptr);
|
||||||
// Activates the first unconfirmed block of \p account_a
|
/**
|
||||||
void activate (nano::account const &, nano::transaction const &);
|
* Activates the first unconfirmed block of \p account_a
|
||||||
|
* @return true if account was activated
|
||||||
|
*/
|
||||||
|
bool activate (nano::account const &, nano::transaction const &);
|
||||||
void stop ();
|
void stop ();
|
||||||
// Blocks until no more elections can be activated or there are no more elections to activate
|
// Blocks until no more elections can be activated or there are no more elections to activate
|
||||||
void flush ();
|
void flush ();
|
||||||
|
|
|
||||||
|
|
@ -33,11 +33,12 @@ extern std::size_t nano_bootstrap_weights_beta_size;
|
||||||
* Configs
|
* Configs
|
||||||
*/
|
*/
|
||||||
|
|
||||||
nano::backlog_population::config nano::nodeconfig_to_backlog_population_config (const nano::node_config & config)
|
nano::backlog_population::config nano::backlog_population_config (const nano::node_config & config)
|
||||||
{
|
{
|
||||||
nano::backlog_population::config cfg;
|
nano::backlog_population::config cfg{};
|
||||||
cfg.ongoing_backlog_population_enabled = config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled;
|
cfg.enabled = config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled;
|
||||||
cfg.delay_between_runs_in_seconds = config.network_params.network.is_dev_network () ? 1u : 300u;
|
cfg.frequency = config.backlog_scan_frequency;
|
||||||
|
cfg.batch_size = config.backlog_scan_batch_size;
|
||||||
return cfg;
|
return cfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -198,7 +199,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
|
||||||
hinting{ nano::nodeconfig_to_hinted_scheduler_config (config), *this, inactive_vote_cache, active, online_reps, stats },
|
hinting{ nano::nodeconfig_to_hinted_scheduler_config (config), *this, inactive_vote_cache, active, online_reps, stats },
|
||||||
aggregator (config, stats, generator, final_generator, history, ledger, wallets, active),
|
aggregator (config, stats, generator, final_generator, history, ledger, wallets, active),
|
||||||
wallets (wallets_store.init_error (), *this),
|
wallets (wallets_store.init_error (), *this),
|
||||||
backlog{ nano::nodeconfig_to_backlog_population_config (config), store, scheduler },
|
backlog{ nano::backlog_population_config (config), store, stats },
|
||||||
startup_time (std::chrono::steady_clock::now ()),
|
startup_time (std::chrono::steady_clock::now ()),
|
||||||
node_seq (seq)
|
node_seq (seq)
|
||||||
{
|
{
|
||||||
|
|
@ -211,6 +212,10 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
|
||||||
return ledger.weight (rep);
|
return ledger.weight (rep);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
backlog.activate_callback.add ([this] (nano::transaction const & transaction, nano::account const & account, nano::account_info const & account_info, nano::confirmation_height_info const & conf_info) {
|
||||||
|
scheduler.activate (account, transaction);
|
||||||
|
});
|
||||||
|
|
||||||
if (!init_error ())
|
if (!init_error ())
|
||||||
{
|
{
|
||||||
telemetry->start ();
|
telemetry->start ();
|
||||||
|
|
@ -776,6 +781,7 @@ void nano::node::stop ()
|
||||||
// Cancels ongoing work generation tasks, which may be blocking other threads
|
// Cancels ongoing work generation tasks, which may be blocking other threads
|
||||||
// No tasks may wait for work generation in I/O threads, or termination signal capturing will be unable to call node::stop()
|
// No tasks may wait for work generation in I/O threads, or termination signal capturing will be unable to call node::stop()
|
||||||
distributed_work.stop ();
|
distributed_work.stop ();
|
||||||
|
backlog.stop ();
|
||||||
unchecked.stop ();
|
unchecked.stop ();
|
||||||
block_processor.stop ();
|
block_processor.stop ();
|
||||||
aggregator.stop ();
|
aggregator.stop ();
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ class work_pool;
|
||||||
std::unique_ptr<container_info_component> collect_container_info (rep_crawler & rep_crawler, std::string const & name);
|
std::unique_ptr<container_info_component> collect_container_info (rep_crawler & rep_crawler, std::string const & name);
|
||||||
|
|
||||||
// Configs
|
// Configs
|
||||||
backlog_population::config nodeconfig_to_backlog_population_config (node_config const &);
|
backlog_population::config backlog_population_config (node_config const &);
|
||||||
vote_cache::config nodeconfig_to_vote_cache_config (node_config const &, node_flags const &);
|
vote_cache::config nodeconfig_to_vote_cache_config (node_config const &, node_flags const &);
|
||||||
hinted_scheduler::config nodeconfig_to_hinted_scheduler_config (node_config const &);
|
hinted_scheduler::config nodeconfig_to_hinted_scheduler_config (node_config const &);
|
||||||
outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &);
|
outbound_bandwidth_limiter::config outbound_bandwidth_limiter_config (node_config const &);
|
||||||
|
|
|
||||||
|
|
@ -128,6 +128,8 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
|
||||||
toml.put ("frontiers_confirmation", serialize_frontiers_confirmation (frontiers_confirmation), "Mode controlling frontier confirmation rate.\ntype:string,{auto,always,disabled}");
|
toml.put ("frontiers_confirmation", serialize_frontiers_confirmation (frontiers_confirmation), "Mode controlling frontier confirmation rate.\ntype:string,{auto,always,disabled}");
|
||||||
toml.put ("max_queued_requests", max_queued_requests, "Limit for number of queued confirmation requests for one channel, after which new requests are dropped until the queue drops below this value.\ntype:uint32");
|
toml.put ("max_queued_requests", max_queued_requests, "Limit for number of queued confirmation requests for one channel, after which new requests are dropped until the queue drops below this value.\ntype:uint32");
|
||||||
toml.put ("rep_crawler_weight_minimum", rep_crawler_weight_minimum.to_string_dec (), "Rep crawler minimum weight, if this is less than minimum principal weight then this is taken as the minimum weight a rep must have to be tracked. If you want to track all reps set this to 0. If you do not want this to influence anything then set it to max value. This is only useful for debugging or for people who really know what they are doing.\ntype:string,amount,raw");
|
toml.put ("rep_crawler_weight_minimum", rep_crawler_weight_minimum.to_string_dec (), "Rep crawler minimum weight, if this is less than minimum principal weight then this is taken as the minimum weight a rep must have to be tracked. If you want to track all reps set this to 0. If you do not want this to influence anything then set it to max value. This is only useful for debugging or for people who really know what they are doing.\ntype:string,amount,raw");
|
||||||
|
toml.put ("backlog_scan_batch_size", backlog_scan_batch_size, "Number of accounts per second to process when doing backlog population scan. Increasing this value will help unconfirmed frontiers get into election prioritization queue faster, however it will also increase resource usage. \ntype:uint");
|
||||||
|
toml.put ("backlog_scan_frequency", backlog_scan_frequency, "Backlog scan divides the scan into smaller batches, number of which is controlled by this value. Higher frequency helps to utilize resources more uniformly, however it also introduces more overhead. The resulting number of accounts per single batch is `backlog_scan_batch_size / backlog_scan_frequency` \ntype:uint");
|
||||||
|
|
||||||
auto work_peers_l (toml.create_array ("work_peers", "A list of \"address:port\" entries to identify work peers."));
|
auto work_peers_l (toml.create_array ("work_peers", "A list of \"address:port\" entries to identify work peers."));
|
||||||
for (auto i (work_peers.begin ()), n (work_peers.end ()); i != n; ++i)
|
for (auto i (work_peers.begin ()), n (work_peers.end ()); i != n; ++i)
|
||||||
|
|
@ -398,6 +400,9 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
|
||||||
frontiers_confirmation = deserialize_frontiers_confirmation (frontiers_confirmation_l);
|
frontiers_confirmation = deserialize_frontiers_confirmation (frontiers_confirmation_l);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
toml.get<unsigned> ("backlog_scan_batch_size", backlog_scan_batch_size);
|
||||||
|
toml.get<unsigned> ("backlog_scan_frequency", backlog_scan_frequency);
|
||||||
|
|
||||||
if (toml.has_key ("experimental"))
|
if (toml.has_key ("experimental"))
|
||||||
{
|
{
|
||||||
auto experimental_config_l (toml.get_required_child ("experimental"));
|
auto experimental_config_l (toml.get_required_child ("experimental"));
|
||||||
|
|
|
||||||
|
|
@ -106,6 +106,12 @@ public:
|
||||||
nano::rocksdb_config rocksdb_config;
|
nano::rocksdb_config rocksdb_config;
|
||||||
nano::lmdb_config lmdb_config;
|
nano::lmdb_config lmdb_config;
|
||||||
nano::frontiers_confirmation_mode frontiers_confirmation{ nano::frontiers_confirmation_mode::automatic };
|
nano::frontiers_confirmation_mode frontiers_confirmation{ nano::frontiers_confirmation_mode::automatic };
|
||||||
|
/** Number of accounts per second to process when doing backlog population scan */
|
||||||
|
unsigned backlog_scan_batch_size{ 10 * 1000 };
|
||||||
|
/** Number of times per second to run backlog population batches. Number of accounts per single batch is `backlog_scan_batch_size / backlog_scan_frequency` */
|
||||||
|
unsigned backlog_scan_frequency{ 10 };
|
||||||
|
|
||||||
|
public:
|
||||||
std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const;
|
std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const;
|
||||||
nano::frontiers_confirmation_mode deserialize_frontiers_confirmation (std::string const &);
|
nano::frontiers_confirmation_mode deserialize_frontiers_confirmation (std::string const &);
|
||||||
/** Entry is ignored if it cannot be parsed as a valid address:port */
|
/** Entry is ignored if it cannot be parsed as a valid address:port */
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue