Merge pull request #4825 from pwojcikdev/extract-pruning

Extract pruning class
This commit is contained in:
Piotr Wójcik 2025-01-18 20:22:36 +01:00 committed by GitHub
commit 7919fd66e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 221 additions and 120 deletions

View file

@ -10,6 +10,7 @@
#include <nano/node/make_store.hpp>
#include <nano/node/online_reps.hpp>
#include <nano/node/portmapping.hpp>
#include <nano/node/pruning.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/manual.hpp>
#include <nano/node/scheduler/priority.hpp>
@ -3456,13 +3457,13 @@ TEST (node, DISABLED_pruning_age)
ASSERT_EQ (3, node1.ledger.block_count ());
// Pruning with default age 1 day
node1.ledger_pruning (1, true);
node1.pruning.ledger_pruning (1, true);
ASSERT_EQ (0, node1.ledger.pruned_count ());
ASSERT_EQ (3, node1.ledger.block_count ());
// Pruning with max age 0
node1.config.max_pruning_age = std::chrono::seconds{ 0 };
node1.ledger_pruning (1, true);
node1.pruning.ledger_pruning (1, true);
ASSERT_EQ (1, node1.ledger.pruned_count ());
ASSERT_EQ (3, node1.ledger.block_count ());
@ -3517,13 +3518,13 @@ TEST (node, DISABLED_pruning_depth)
ASSERT_EQ (3, node1.ledger.block_count ());
// Pruning with default depth (unlimited)
node1.ledger_pruning (1, true);
node1.pruning.ledger_pruning (1, true);
ASSERT_EQ (0, node1.ledger.pruned_count ());
ASSERT_EQ (3, node1.ledger.block_count ());
// Pruning with max depth 1
node1.config.max_pruning_depth = 1;
node1.ledger_pruning (1, true);
node1.pruning.ledger_pruning (1, true);
ASSERT_EQ (1, node1.ledger.pruned_count ());
ASSERT_EQ (3, node1.ledger.block_count ());

View file

@ -59,7 +59,7 @@ enum class type
tcp_server,
tcp_listener,
tcp_channels,
prunning,
pruning,
conf_processor_bounded,
conf_processor_unbounded,
distributed_work,

View file

@ -116,6 +116,7 @@ enum class type
message_processor_type,
process_confirmed,
online_reps,
pruning,
_last // Must be the last enum
};
@ -648,6 +649,12 @@ enum class detail
block_confirmed,
large_backlog,
// pruning
ledger_pruning,
pruning_target,
pruned_count,
collect_targets,
_last // Must be the last enum
};

View file

@ -190,6 +190,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::http_callbacks:
thread_role_name_string = "HTTP callbacks";
break;
case nano::thread_role::name::pruning:
thread_role_name_string = "Pruning";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}

View file

@ -68,6 +68,7 @@ enum class name
online_reps,
monitor,
http_callbacks,
pruning,
};
std::string_view to_string (name);

View file

@ -16,6 +16,7 @@
#include <nano/node/json_handler.hpp>
#include <nano/node/node.hpp>
#include <nano/node/online_reps.hpp>
#include <nano/node/pruning.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
@ -1923,7 +1924,7 @@ int main (int argc, char * const * argv)
nano::update_flags (node_flags, vm);
nano::inactive_node inactive_node (data_path, node_flags);
auto node = inactive_node.node;
node->ledger_pruning (node_flags.block_processor_batch_size != 0 ? node_flags.block_processor_batch_size : 16 * 1024, true);
node->pruning.ledger_pruning (node_flags.block_processor_batch_size != 0 ? node_flags.block_processor_batch_size : 16 * 1024, true);
}
else if (vm.count ("debug_stacktrace"))
{

View file

@ -130,6 +130,8 @@ add_library(
portmapping.cpp
process_live_dispatcher.cpp
process_live_dispatcher.hpp
pruning.hpp
pruning.cpp
recently_cemented_cache.cpp
recently_cemented_cache.hpp
recently_confirmed_cache.cpp

View file

@ -30,6 +30,7 @@ class node_config;
class node_flags;
class node_observers;
class online_reps;
class pruning;
class recently_cemented_cache;
class recently_confirmed_cache;
class rep_crawler;

View file

@ -30,6 +30,7 @@
#include <nano/node/online_reps.hpp>
#include <nano/node/peer_history.hpp>
#include <nano/node/portmapping.hpp>
#include <nano/node/pruning.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/rpc_callbacks.hpp>
#include <nano/node/scheduler/component.hpp>
@ -199,6 +200,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
monitor{ *monitor_impl },
http_callbacks_impl{ std::make_unique<nano::http_callbacks> (*this) },
http_callbacks{ *http_callbacks_impl },
pruning_impl{ std::make_unique<nano::pruning> (config, flags, ledger, stats, logger) },
pruning{ *pruning_impl },
startup_time{ std::chrono::steady_clock::now () },
node_seq{ seq }
{
@ -509,13 +512,6 @@ void nano::node::start ()
network.start ();
message_processor.start ();
if (flags.enable_pruning)
{
auto this_l (shared ());
workers.post ([this_l] () {
this_l->ongoing_ledger_pruning ();
});
}
if (!flags.disable_rep_crawler)
{
rep_crawler.start ();
@ -578,6 +574,7 @@ void nano::node::start ()
online_reps.start ();
monitor.start ();
http_callbacks.start ();
pruning.start ();
add_initial_peers ();
}
@ -627,6 +624,7 @@ void nano::node::stop ()
network.stop ();
monitor.stop ();
http_callbacks.stop ();
pruning.stop ();
bootstrap_workers.stop ();
wallet_workers.stop ();
@ -722,110 +720,6 @@ void nano::node::search_receivable_all ()
});
}
bool nano::node::collect_ledger_pruning_targets (std::deque<nano::block_hash> & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a)
{
uint64_t read_operations (0);
bool finish_transaction (false);
auto transaction = ledger.tx_begin_read ();
for (auto i (store.confirmation_height.begin (transaction, last_account_a)), n (store.confirmation_height.end (transaction)); i != n && !finish_transaction;)
{
++read_operations;
auto const & account (i->first);
nano::block_hash hash (i->second.frontier);
uint64_t depth (0);
while (!hash.is_zero () && depth < max_depth_a)
{
auto block = ledger.any.block_get (transaction, hash);
if (block != nullptr)
{
if (block->sideband ().timestamp > cutoff_time_a || depth == 0)
{
hash = block->previous ();
}
else
{
break;
}
}
else
{
release_assert (depth != 0);
hash = 0;
}
if (++depth % batch_read_size_a == 0)
{
// FIXME: This is triggering an assertion where the iterator is still used after transaction is refreshed
transaction.refresh ();
}
}
if (!hash.is_zero ())
{
pruning_targets_a.push_back (hash);
}
read_operations += depth;
if (read_operations >= batch_read_size_a)
{
last_account_a = inc_sat (account.number ());
finish_transaction = true;
}
else
{
++i;
}
}
return !finish_transaction || last_account_a.is_zero ();
}
void nano::node::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_weight_reached_a)
{
uint64_t const max_depth (config.max_pruning_depth != 0 ? config.max_pruning_depth : std::numeric_limits<uint64_t>::max ());
uint64_t const cutoff_time (bootstrap_weight_reached_a ? nano::seconds_since_epoch () - config.max_pruning_age.count () : std::numeric_limits<uint64_t>::max ());
uint64_t pruned_count (0);
uint64_t transaction_write_count (0);
nano::account last_account (1); // 0 Burn account is never opened. So it can be used to break loop
std::deque<nano::block_hash> pruning_targets;
bool target_finished (false);
while ((transaction_write_count != 0 || !target_finished) && !stopped)
{
// Search pruning targets
while (pruning_targets.size () < batch_size_a && !target_finished && !stopped)
{
target_finished = collect_ledger_pruning_targets (pruning_targets, last_account, batch_size_a * 2, max_depth, cutoff_time);
}
// Pruning write operation
transaction_write_count = 0;
if (!pruning_targets.empty () && !stopped)
{
auto write_transaction = ledger.tx_begin_write (nano::store::writer::pruning);
while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped)
{
auto const & pruning_hash (pruning_targets.front ());
auto account_pruned_count (ledger.pruning_action (write_transaction, pruning_hash, batch_size_a));
transaction_write_count += account_pruned_count;
pruning_targets.pop_front ();
}
pruned_count += transaction_write_count;
logger.debug (nano::log::type::prunning, "Pruned blocks: {}", pruned_count);
}
}
logger.debug (nano::log::type::prunning, "Total recently pruned block count: {}", pruned_count);
}
void nano::node::ongoing_ledger_pruning ()
{
auto bootstrap_weight_reached (ledger.block_count () >= ledger.bootstrap_weight_max_blocks);
ledger_pruning (flags.block_processor_batch_size != 0 ? flags.block_processor_batch_size : 2 * 1024, bootstrap_weight_reached);
auto const ledger_pruning_interval (bootstrap_weight_reached ? config.max_pruning_age : std::min (config.max_pruning_age, std::chrono::seconds (15 * 60)));
auto this_l (shared ());
workers.post_delayed (ledger_pruning_interval, [this_l] () {
this_l->workers.post ([this_l] () {
this_l->ongoing_ledger_pruning ();
});
});
}
uint64_t nano::node::default_difficulty (nano::work_version const version_a) const
{
uint64_t result{ std::numeric_limits<uint64_t>::max () };
@ -1098,6 +992,7 @@ nano::container_info nano::node::container_info () const
info.add ("backlog_scan", backlog_scan.container_info ());
info.add ("bounded_backlog", backlog.container_info ());
info.add ("http_callbacks", http_callbacks.container_info ());
info.add ("pruning", pruning.container_info ());
return info;
}

View file

@ -61,9 +61,6 @@ public:
nano::uint128_t minimum_principal_weight ();
void backup_wallet ();
void search_receivable_all ();
bool collect_ledger_pruning_targets (std::deque<nano::block_hash> &, nano::account &, uint64_t const, uint64_t const, uint64_t const);
void ledger_pruning (uint64_t const, bool);
void ongoing_ledger_pruning ();
// The default difficulty updates to base only when the first epoch_2 block is processed
uint64_t default_difficulty (nano::work_version const) const;
uint64_t default_receive_difficulty (nano::work_version const) const;
@ -204,6 +201,8 @@ public:
nano::monitor & monitor;
std::unique_ptr<nano::http_callbacks> http_callbacks_impl;
nano::http_callbacks & http_callbacks;
std::unique_ptr<nano::pruning> pruning_impl;
nano::pruning & pruning;
public:
std::chrono::steady_clock::time_point const startup_time;

152
nano/node/pruning.cpp Normal file
View file

@ -0,0 +1,152 @@
#include <nano/node/node.hpp>
#include <nano/node/pruning.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
nano::pruning::pruning (nano::node_config const & config_a, nano::node_flags const & flags_a, nano::ledger & ledger_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
flags{ flags_a },
ledger{ ledger_a },
stats{ stats_a },
logger{ logger_a },
workers{ /* single threaded */ 1, nano::thread_role::name::pruning }
{
}
nano::pruning::~pruning ()
{
// Must be stopped before destruction
debug_assert (stopped);
}
void nano::pruning::start ()
{
if (flags.enable_pruning)
{
workers.start ();
workers.post ([this] () {
ongoing_ledger_pruning ();
});
}
}
void nano::pruning::stop ()
{
stopped = true;
workers.stop ();
}
void nano::pruning::ongoing_ledger_pruning ()
{
auto bootstrap_weight_reached (ledger.block_count () >= ledger.bootstrap_weight_max_blocks);
ledger_pruning (flags.block_processor_batch_size != 0 ? flags.block_processor_batch_size : 2 * 1024, bootstrap_weight_reached);
auto const ledger_pruning_interval (bootstrap_weight_reached ? config.max_pruning_age : std::min (config.max_pruning_age, std::chrono::seconds (15 * 60)));
workers.post_delayed (ledger_pruning_interval, [this] () {
workers.post ([this] () {
ongoing_ledger_pruning ();
});
});
}
void nano::pruning::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_weight_reached_a)
{
stats.inc (nano::stat::type::pruning, nano::stat::detail::ledger_pruning);
uint64_t const max_depth (config.max_pruning_depth != 0 ? config.max_pruning_depth : std::numeric_limits<uint64_t>::max ());
uint64_t const cutoff_time (bootstrap_weight_reached_a ? nano::seconds_since_epoch () - config.max_pruning_age.count () : std::numeric_limits<uint64_t>::max ());
uint64_t pruned_count (0);
uint64_t transaction_write_count (0);
nano::account last_account (1); // 0 Burn account is never opened. So it can be used to break loop
std::deque<nano::block_hash> pruning_targets;
bool target_finished (false);
while ((transaction_write_count != 0 || !target_finished) && !stopped)
{
// Search pruning targets
while (pruning_targets.size () < batch_size_a && !target_finished && !stopped)
{
stats.inc (nano::stat::type::pruning, nano::stat::detail::collect_targets);
target_finished = collect_ledger_pruning_targets (pruning_targets, last_account, batch_size_a * 2, max_depth, cutoff_time);
}
// Pruning write operation
transaction_write_count = 0;
if (!pruning_targets.empty () && !stopped)
{
auto write_transaction = ledger.tx_begin_write (nano::store::writer::pruning);
while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped)
{
stats.inc (nano::stat::type::pruning, nano::stat::detail::pruning_target);
auto const & pruning_hash (pruning_targets.front ());
auto account_pruned_count (ledger.pruning_action (write_transaction, pruning_hash, batch_size_a));
transaction_write_count += account_pruned_count;
pruning_targets.pop_front ();
stats.add (nano::stat::type::pruning, nano::stat::detail::pruned_count, account_pruned_count);
}
pruned_count += transaction_write_count;
logger.debug (nano::log::type::pruning, "Pruned blocks: {}", pruned_count);
}
}
logger.debug (nano::log::type::pruning, "Total recently pruned block count: {}", pruned_count);
}
bool nano::pruning::collect_ledger_pruning_targets (std::deque<nano::block_hash> & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a)
{
uint64_t read_operations (0);
bool finish_transaction (false);
auto transaction = ledger.tx_begin_read ();
for (auto i (ledger.store.confirmation_height.begin (transaction, last_account_a)), n (ledger.store.confirmation_height.end (transaction)); i != n && !finish_transaction;)
{
++read_operations;
auto const & account (i->first);
nano::block_hash hash (i->second.frontier);
uint64_t depth (0);
while (!hash.is_zero () && depth < max_depth_a)
{
auto block = ledger.any.block_get (transaction, hash);
if (block != nullptr)
{
if (block->sideband ().timestamp > cutoff_time_a || depth == 0)
{
hash = block->previous ();
}
else
{
break;
}
}
else
{
release_assert (depth != 0);
hash = 0;
}
if (++depth % batch_read_size_a == 0)
{
// FIXME: This is triggering an assertion where the iterator is still used after transaction is refreshed
transaction.refresh ();
}
}
if (!hash.is_zero ())
{
pruning_targets_a.push_back (hash);
}
read_operations += depth;
if (read_operations >= batch_read_size_a)
{
last_account_a = inc_sat (account.number ());
finish_transaction = true;
}
else
{
++i;
}
}
return !finish_transaction || last_account_a.is_zero ();
}
nano::container_info nano::pruning::container_info () const
{
return workers.container_info ();
}

39
nano/node/pruning.hpp Normal file
View file

@ -0,0 +1,39 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <nano/node/fwd.hpp>
#include <deque>
#include <thread>
namespace nano
{
class pruning final
{
public:
pruning (nano::node_config const &, nano::node_flags const &, nano::ledger &, nano::stats &, nano::logger &);
~pruning ();
void start ();
void stop ();
nano::container_info container_info () const;
void ongoing_ledger_pruning ();
void ledger_pruning (uint64_t batch_size, bool bootstrap_weight_reached);
bool collect_ledger_pruning_targets (std::deque<nano::block_hash> & pruning_targets_out, nano::account & last_account_out, uint64_t batch_read_size, uint64_t max_depth, uint64_t cutoff_time);
private: // Dependencies
nano::node_config const & config;
nano::node_flags const & flags;
nano::ledger & ledger;
nano::stats & stats;
nano::logger & logger;
private:
void run ();
std::atomic<bool> stopped{ false };
nano::thread_pool workers;
};
}