Add populate_backlog rpc command (#3860)

* Add `populate_backlog` rpc

* Add `populate_backlog` rpc tests

* Only allow `populate_backlog` RPC under enable_control

* Extract backlog population logic to separate class and run it on a dedicated thread

* Delay backlog population initial start

* Move triggered to start of the loop

* Use explicit dependencies

* Move delay assignment outside the loop and make it constant.

* Document backlog_population::triggered and thread members

It is not immediately obvious that the trigger can be used even when
backlog population is disabled and that it is used for manual triggering
mainly.

* Improve code readability

* Rename store to store_m to avoid compilation problem on gcc

* Document intention of notify function

* Break backlog_population's dependency on nodeconfig

Introduce the class nano::backlog_population::config to hold the
configuration items and not need access to nano::nodeconfig.

* Fix for compilation problem.

* Move to namespace

Co-authored-by: Dimitrios Siganos <dimitris@siganos.org>
This commit is contained in:
Piotr Wójcik 2022-07-21 00:20:22 +02:00 committed by GitHub
commit 82c4c10f7e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 230 additions and 46 deletions

View file

@ -90,16 +90,19 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::unchecked:
thread_role_name_string = "Unchecked";
break;
case nano::thread_role::name::backlog_population:
thread_role_name_string = "Backlog";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
/*
* We want to constrain the thread names to 15
* characters, since this is the smallest maximum
* length supported by the platforms we support
* (specifically, Linux)
*/
* We want to constrain the thread names to 15
* characters, since this is the smallest maximum
* length supported by the platforms we support
* (specifically, Linux)
*/
debug_assert (thread_role_name_string.size () < 16);
return (thread_role_name_string);
}
@ -121,7 +124,7 @@ void nano::thread_role::set (nano::thread_role::name role)
void nano::thread_attributes::set (boost::thread::attributes & attrs)
{
auto attrs_l (&attrs);
attrs_l->set_stack_size (8000000); //8MB
attrs_l->set_stack_size (8000000); // 8MB
}
nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a) :

View file

@ -42,6 +42,7 @@ namespace thread_role
db_parallel_traversal,
election_scheduler,
unchecked,
backlog_population
};
/*

View file

@ -16,6 +16,8 @@ add_library(
${platform_sources}
active_transactions.hpp
active_transactions.cpp
backlog_population.hpp
backlog_population.cpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp

View file

@ -0,0 +1,96 @@
#include <nano/lib/threading.hpp>
#include <nano/node/backlog_population.hpp>
#include <nano/node/election_scheduler.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/secure/store.hpp>
nano::backlog_population::backlog_population (const config & config_a, nano::store & store_a, nano::election_scheduler & scheduler_a) :
config_m{ config_a },
store_m{ store_a },
scheduler{ scheduler_a }
{
}
nano::backlog_population::~backlog_population ()
{
stop ();
if (thread.joinable ())
{
thread.join ();
}
}
void nano::backlog_population::start ()
{
if (!thread.joinable ())
{
thread = std::thread{ [this] () { run (); } };
}
}
void nano::backlog_population::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;
notify ();
}
void nano::backlog_population::trigger ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
triggered = true;
notify ();
}
void nano::backlog_population::notify ()
{
condition.notify_all ();
}
bool nano::backlog_population::predicate () const
{
return triggered;
}
void nano::backlog_population::run ()
{
nano::thread_role::set (nano::thread_role::name::backlog_population);
const auto delay = std::chrono::seconds{ config_m.delay_between_runs_in_seconds };
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (predicate () || config_m.ongoing_backlog_population_enabled)
{
triggered = false;
lock.unlock ();
populate_backlog ();
lock.lock ();
}
condition.wait_for (lock, delay, [this] () {
return stopped || predicate ();
});
}
}
void nano::backlog_population::populate_backlog ()
{
auto done = false;
uint64_t const chunk_size = 65536;
nano::account next = 0;
uint64_t total = 0;
while (!stopped && !done)
{
auto transaction = store_m.tx_begin_read ();
auto count = 0;
auto i = store_m.account.begin (transaction, next);
const auto end = store_m.account.end ();
for (; !stopped && i != end && count < chunk_size; ++i, ++count, ++total)
{
auto const & account = i->first;
scheduler.activate (account, transaction);
next = account.number () + 1;
}
done = store_m.account.begin (transaction, next) == end;
}
}

View file

@ -0,0 +1,58 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <atomic>
#include <condition_variable>
#include <thread>
namespace nano
{
class store;
class election_scheduler;
class backlog_population final
{
public:
struct config
{
bool ongoing_backlog_population_enabled;
unsigned int delay_between_runs_in_seconds;
};
explicit backlog_population (const config & config_a, store & store, election_scheduler & scheduler);
~backlog_population ();
void start ();
void stop ();
void trigger ();
/** Other components call this to notify us about external changes, so we can check our predicate. */
void notify ();
private:
void run ();
bool predicate () const;
void populate_backlog ();
/** This is a manual trigger, the ongoing backlog population does not use this.
* It can be triggered even when backlog population (frontiers confirmation) is disabled. */
bool triggered{ false };
std::atomic<bool> stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
/** Thread that runs the backlog implementation logic. The thread always runs, even if
* backlog population is disabled, so that it can service a manual trigger (e.g. via RPC). */
std::thread thread;
config config_m;
private: // Dependencies
store & store_m;
election_scheduler & scheduler;
};
}

View file

@ -5224,6 +5224,13 @@ void nano::json_handler::work_peers_clear ()
response_errors ();
}
void nano::json_handler::populate_backlog ()
{
node.backlog.trigger ();
response_l.put ("success", "");
response_errors ();
}
void nano::inprocess_rpc_handler::process_request (std::string const &, std::string const & body_a, std::function<void (std::string const &)> response_a)
{
// Note that if the rpc action is async, the shared_ptr<json_handler> lifetime will be extended by the action handler
@ -5388,6 +5395,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map ()
no_arg_funcs.emplace ("work_peer_add", &nano::json_handler::work_peer_add);
no_arg_funcs.emplace ("work_peers", &nano::json_handler::work_peers);
no_arg_funcs.emplace ("work_peers_clear", &nano::json_handler::work_peers_clear);
no_arg_funcs.emplace ("populate_backlog", &nano::json_handler::populate_backlog);
return no_arg_funcs;
}

View file

@ -89,6 +89,7 @@ public:
void pending_exists ();
void receivable ();
void receivable_exists ();
void populate_backlog ();
void process ();
void pruned_exists ();
void receive ();

View file

@ -33,6 +33,14 @@ extern unsigned char nano_bootstrap_weights_beta[];
extern std::size_t nano_bootstrap_weights_beta_size;
}
nano::backlog_population::config nano::nodeconfig_to_backlog_population_config (const nano::node_config & config)
{
nano::backlog_population::config cfg;
cfg.ongoing_backlog_population_enabled = config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled;
cfg.delay_between_runs_in_seconds = config.network_params.network.is_dev_network () ? 1u : 300u;
return cfg;
}
void nano::node::keepalive (std::string const & address_a, uint16_t port_a)
{
auto node_l (shared_from_this ());
@ -155,6 +163,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
scheduler{ *this },
aggregator (config, stats, active.generator, active.final_generator, history, ledger, wallets, active),
wallets (wallets_store.init_error (), *this),
backlog{ nano::nodeconfig_to_backlog_population_config (config), store, scheduler },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq)
{
@ -702,12 +711,7 @@ void nano::node::start ()
port_mapping.start ();
}
wallets.start ();
if (config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled)
{
workers.push_task ([this_l = shared ()] () {
this_l->ongoing_backlog_population ();
});
}
backlog.start ();
}
void nano::node::stop ()
@ -1022,15 +1026,6 @@ void nano::node::ongoing_unchecked_cleanup ()
});
}
void nano::node::ongoing_backlog_population ()
{
populate_backlog ();
auto delay = config.network_params.network.is_dev_network () ? std::chrono::seconds{ 1 } : std::chrono::duration_cast<std::chrono::seconds> (std::chrono::minutes{ 5 });
workers.add_timed_task (std::chrono::steady_clock::now () + delay, [this_l = shared ()] () {
this_l->ongoing_backlog_population ();
});
}
bool nano::node::collect_ledger_pruning_targets (std::deque<nano::block_hash> & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a)
{
uint64_t read_operations (0);
@ -1794,26 +1789,6 @@ std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> nano::node::get_
return { max_blocks, weights };
}
void nano::node::populate_backlog ()
{
auto done = false;
uint64_t const chunk_size = 65536;
nano::account next = 0;
uint64_t total = 0;
while (!stopped && !done)
{
auto transaction = store.tx_begin_read ();
auto count = 0;
for (auto i = store.account.begin (transaction, next), n = store.account.end (); !stopped && i != n && count < chunk_size; ++i, ++count, ++total)
{
auto const & account = i->first;
scheduler.activate (account, transaction);
next = account.number () + 1;
}
done = store.account.begin (transaction, next) == store.account.end ();
}
}
/** Convenience function to easily return the confirmation height of an account. */
uint64_t nano::node::get_confirmation_height (nano::transaction const & transaction_a, nano::account & account_a)
{

View file

@ -4,6 +4,7 @@
#include <nano/lib/stats.hpp>
#include <nano/lib/work.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/backlog_population.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
@ -83,9 +84,11 @@ public:
};
std::unique_ptr<container_info_component> collect_container_info (block_arrival & block_arrival, std::string const & name);
std::unique_ptr<container_info_component> collect_container_info (rep_crawler & rep_crawler, std::string const & name);
// Configs
backlog_population::config nodeconfig_to_backlog_population_config (const node_config & config);
class node final : public std::enable_shared_from_this<nano::node>
{
public:
@ -120,7 +123,6 @@ public:
void ongoing_bootstrap ();
void ongoing_peer_store ();
void ongoing_unchecked_cleanup ();
void ongoing_backlog_population ();
void backup_wallet ();
void search_receivable_all ();
void bootstrap_wallet ();
@ -151,7 +153,6 @@ public:
bool epoch_upgrader (nano::raw_key const &, nano::epoch, uint64_t, uint64_t);
void set_bandwidth_params (std::size_t limit, double ratio);
std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> get_bootstrap_weights () const;
void populate_backlog ();
uint64_t get_confirmation_height (nano::transaction const &, nano::account &);
nano::write_database_queue write_database_queue;
boost::asio::io_context & io_ctx;
@ -195,6 +196,8 @@ public:
nano::election_scheduler scheduler;
nano::request_aggregator aggregator;
nano::wallets wallets;
nano::backlog_population backlog;
std::chrono::steady_clock::time_point const startup_time;
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week
std::atomic<bool> unresponsive_work_peers{ false };

View file

@ -162,6 +162,7 @@ std::unordered_set<std::string> create_rpc_control_impls ()
set.emplace ("ledger");
set.emplace ("node_id");
set.emplace ("password_change");
set.emplace ("populate_backlog");
set.emplace ("receive");
set.emplace ("receive_minimum");
set.emplace ("receive_minimum_set");

View file

@ -4994,6 +4994,42 @@ TEST (rpc, work_peers_all)
ASSERT_EQ (0, peers_node.size ());
}
TEST (rpc, populate_backlog)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
// Disable automatic backlog population
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node = add_ipc_enabled_node (system, node_config);
// Create and process a block that won't get automatically scheduled for confirmation
nano::keypair key;
nano::block_builder builder;
auto latest (node->latest (nano::dev::genesis_key.pub));
auto genesis_balance (nano::dev::constants.genesis_amount);
auto send_amount (genesis_balance - 100);
auto send = builder
.send ()
.previous (latest)
.destination (key.pub)
.balance (genesis_balance)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node->work_generate_blocking (latest))
.build ();
ASSERT_EQ (nano::process_result::progress, node->process (*send).code);
ASSERT_FALSE (node->block_arrival.recent (send->hash ()));
auto const rpc_ctx = add_rpc (system, node);
boost::property_tree::ptree request;
request.put ("action", "populate_backlog");
auto response (wait_response (system, rpc_ctx, request));
std::string success (response.get<std::string> ("success", ""));
ASSERT_TRUE (success.empty ());
// Ensure block got activated and election was started
ASSERT_TIMELY (5s, node->active.active (*send));
}
TEST (rpc, ledger)
{
nano::system system;
@ -5726,7 +5762,7 @@ TEST (rpc, online_reps)
boost::optional<std::string> weight (item->second.get_optional<std::string> ("weight"));
ASSERT_FALSE (weight.is_initialized ());
ASSERT_TIMELY (5s, node2->block (send_block->hash ()));
//Test weight option
// Test weight option
request.put ("weight", "true");
auto response2 (wait_response (system, rpc_ctx, request));
auto representatives2 (response2.get_child ("representatives"));
@ -5735,7 +5771,7 @@ TEST (rpc, online_reps)
ASSERT_EQ (nano::dev::genesis_key.pub.to_account (), item2->first);
auto weight2 (item2->second.get<std::string> ("weight"));
ASSERT_EQ (node2->weight (nano::dev::genesis_key.pub).convert_to<std::string> (), weight2);
//Test accounts filter
// Test accounts filter
rpc_ctx.io_scope->reset ();
auto new_rep (system.wallet (1)->deterministic_insert ());
auto send (system.wallet (0)->send_action (nano::dev::genesis_key.pub, new_rep, node1->config.receive_minimum.number ()));