Extract epoch upgrader out of node (#4078)

* Extract `epoch_upgrader` class

* Fix slow test
This commit is contained in:
Piotr Wójcik 2023-01-27 14:33:46 +01:00 committed by GitHub
commit 081ba95884
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 347 additions and 285 deletions

View file

@ -66,6 +66,8 @@ add_library(
election.cpp
election_scheduler.hpp
election_scheduler.cpp
epoch_upgrader.hpp
epoch_upgrader.cpp
gap_cache.hpp
gap_cache.cpp
hinted_scheduler.hpp

View file

@ -0,0 +1,300 @@
#include <nano/lib/threading.hpp>
#include <nano/node/epoch_upgrader.hpp>
#include <nano/node/node.hpp>
nano::epoch_upgrader::epoch_upgrader (nano::node & node_a, nano::ledger & ledger_a, nano::store & store_a, nano::network_params & network_params_a, nano::logger_mt & logger_a) :
node{ node_a },
ledger{ ledger_a },
store{ store_a },
network_params{ network_params_a },
logger{ logger_a }
{
}
void nano::epoch_upgrader::stop ()
{
stopped = true;
auto epoch_upgrade = epoch_upgrading.lock ();
if (epoch_upgrade->valid ())
{
epoch_upgrade->wait ();
}
}
bool nano::epoch_upgrader::start (nano::raw_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
{
bool error = stopped.load ();
if (!error)
{
auto epoch_upgrade = epoch_upgrading.lock ();
error = epoch_upgrade->valid () && epoch_upgrade->wait_for (std::chrono::seconds (0)) == std::future_status::timeout;
if (!error)
{
*epoch_upgrade = std::async (std::launch::async, [this, prv_a, epoch_a, count_limit, threads] () {
upgrade_impl (prv_a, epoch_a, count_limit, threads);
});
}
}
return error;
}
// TODO: This method should be a class
void nano::epoch_upgrader::upgrade_impl (nano::raw_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
{
nano::thread_role::set (nano::thread_role::name::epoch_upgrader);
auto upgrader_process = [this] (std::atomic<uint64_t> & counter, std::shared_ptr<nano::block> const & epoch, uint64_t difficulty, nano::public_key const & signer_a, nano::root const & root_a, nano::account const & account_a) {
epoch->block_work_set (node.work_generate_blocking (nano::work_version::work_1, root_a, difficulty).value_or (0));
bool valid_signature (!nano::validate_message (signer_a, epoch->hash (), epoch->block_signature ()));
bool valid_work (node.network_params.work.difficulty (*epoch) >= difficulty);
nano::process_result result (nano::process_result::old);
if (valid_signature && valid_work)
{
result = node.process_local (epoch).code;
}
if (result == nano::process_result::progress)
{
++counter;
}
else
{
bool fork (result == nano::process_result::fork);
logger.always_log (boost::str (boost::format ("Failed to upgrade account %1%. Valid signature: %2%. Valid work: %3%. Block processor fork: %4%") % account_a.to_account () % valid_signature % valid_work % fork));
}
};
uint64_t const upgrade_batch_size = 1000;
nano::block_builder builder;
auto link (ledger.epoch_link (epoch_a));
nano::raw_key raw_key;
raw_key = prv_a;
auto signer (nano::pub_key (prv_a));
debug_assert (signer == ledger.epoch_signer (link));
nano::mutex upgrader_mutex;
nano::condition_variable upgrader_condition;
class account_upgrade_item final
{
public:
nano::account account{};
uint64_t modified{ 0 };
};
class account_tag
{
};
class modified_tag
{
};
// clang-format off
boost::multi_index_container<account_upgrade_item,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::tag<modified_tag>,
boost::multi_index::member<account_upgrade_item, uint64_t, &account_upgrade_item::modified>,
std::greater<uint64_t>>,
boost::multi_index::hashed_unique<boost::multi_index::tag<account_tag>,
boost::multi_index::member<account_upgrade_item, nano::account, &account_upgrade_item::account>>>>
accounts_list;
// clang-format on
bool finished_upgrade (false);
while (!finished_upgrade && !stopped)
{
bool finished_accounts (false);
uint64_t total_upgraded_accounts (0);
while (!finished_accounts && count_limit != 0 && !stopped)
{
{
auto transaction (store.tx_begin_read ());
// Collect accounts to upgrade
for (auto i (store.account.begin (transaction)), n (store.account.end ()); i != n && accounts_list.size () < count_limit; ++i)
{
nano::account const & account (i->first);
nano::account_info const & info (i->second);
if (info.epoch () < epoch_a)
{
release_assert (nano::epochs::is_sequential (info.epoch (), epoch_a));
accounts_list.emplace (account_upgrade_item{ account, info.modified });
}
}
}
/* Upgrade accounts
Repeat until accounts with previous epoch exist in latest table */
std::atomic<uint64_t> upgraded_accounts (0);
uint64_t workers (0);
uint64_t attempts (0);
for (auto i (accounts_list.get<modified_tag> ().begin ()), n (accounts_list.get<modified_tag> ().end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped; ++i)
{
auto transaction (store.tx_begin_read ());
nano::account_info info;
nano::account const & account (i->account);
if (!store.account.get (transaction, account, info) && info.epoch () < epoch_a)
{
++attempts;
auto difficulty (node.network_params.work.threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
nano::root const & root (info.head);
std::shared_ptr<nano::block> epoch = builder.state ()
.account (account)
.previous (info.head)
.representative (info.representative)
.balance (info.balance)
.link (link)
.sign (raw_key, signer)
.work (0)
.build ();
if (threads != 0)
{
{
nano::unique_lock<nano::mutex> lock{ upgrader_mutex };
++workers;
while (workers > threads)
{
upgrader_condition.wait (lock);
}
}
node.workers.push_task ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account] () {
upgrader_process (upgraded_accounts, epoch, difficulty, signer, root, account);
{
nano::lock_guard<nano::mutex> lock{ upgrader_mutex };
--workers;
}
upgrader_condition.notify_all ();
});
}
else
{
upgrader_process (upgraded_accounts, epoch, difficulty, signer, root, account);
}
}
}
{
nano::unique_lock<nano::mutex> lock{ upgrader_mutex };
while (workers > 0)
{
upgrader_condition.wait (lock);
}
}
total_upgraded_accounts += upgraded_accounts;
count_limit -= upgraded_accounts;
if (!accounts_list.empty ())
{
logger.always_log (boost::str (boost::format ("%1% accounts were upgraded to new epoch, %2% remain...") % total_upgraded_accounts % (accounts_list.size () - upgraded_accounts)));
accounts_list.clear ();
}
else
{
logger.always_log (boost::str (boost::format ("%1% total accounts were upgraded to new epoch") % total_upgraded_accounts));
finished_accounts = true;
}
}
// Pending blocks upgrade
bool finished_pending (false);
uint64_t total_upgraded_pending (0);
while (!finished_pending && count_limit != 0 && !stopped)
{
std::atomic<uint64_t> upgraded_pending (0);
uint64_t workers (0);
uint64_t attempts (0);
auto transaction (store.tx_begin_read ());
for (auto i (store.pending.begin (transaction, nano::pending_key (1, 0))), n (store.pending.end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped;)
{
bool to_next_account (false);
nano::pending_key const & key (i->first);
if (!store.account.exists (transaction, key.account))
{
nano::pending_info const & info (i->second);
if (info.epoch < epoch_a)
{
++attempts;
release_assert (nano::epochs::is_sequential (info.epoch, epoch_a));
auto difficulty (network_params.work.threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
nano::root const & root (key.account);
nano::account const & account (key.account);
std::shared_ptr<nano::block> epoch = builder.state ()
.account (key.account)
.previous (0)
.representative (0)
.balance (0)
.link (link)
.sign (raw_key, signer)
.work (0)
.build ();
if (threads != 0)
{
{
nano::unique_lock<nano::mutex> lock{ upgrader_mutex };
++workers;
while (workers > threads)
{
upgrader_condition.wait (lock);
}
}
node.workers.push_task ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account] () {
upgrader_process (upgraded_pending, epoch, difficulty, signer, root, account);
{
nano::lock_guard<nano::mutex> lock{ upgrader_mutex };
--workers;
}
upgrader_condition.notify_all ();
});
}
else
{
upgrader_process (upgraded_pending, epoch, difficulty, signer, root, account);
}
}
}
else
{
to_next_account = true;
}
if (to_next_account)
{
// Move to next account if pending account exists or was upgraded
if (key.account.number () == std::numeric_limits<nano::uint256_t>::max ())
{
break;
}
else
{
i = store.pending.begin (transaction, nano::pending_key (key.account.number () + 1, 0));
}
}
else
{
// Move to next pending item
++i;
}
}
{
nano::unique_lock<nano::mutex> lock{ upgrader_mutex };
while (workers > 0)
{
upgrader_condition.wait (lock);
}
}
total_upgraded_pending += upgraded_pending;
count_limit -= upgraded_pending;
// Repeat if some pending accounts were upgraded
if (upgraded_pending != 0)
{
logger.always_log (boost::str (boost::format ("%1% unopened accounts with pending blocks were upgraded to new epoch...") % total_upgraded_pending));
}
else
{
logger.always_log (boost::str (boost::format ("%1% total unopened accounts with pending blocks were upgraded to new epoch") % total_upgraded_pending));
finished_pending = true;
}
}
finished_upgrade = (total_upgraded_accounts == 0) && (total_upgraded_pending == 0);
}
logger.always_log ("Epoch upgrade is completed");
}

View file

@ -0,0 +1,38 @@
#pragma once
#include <nano/lib/epoch.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <future>
namespace nano
{
class node;
class ledger;
class store;
class network_params;
class logger_mt;
class epoch_upgrader final
{
public:
epoch_upgrader (nano::node &, nano::ledger &, nano::store &, nano::network_params &, nano::logger_mt &);
bool start (nano::raw_key const & prv, nano::epoch epoch, uint64_t count_limit, uint64_t threads);
void stop ();
private: // Dependencies
nano::node & node;
nano::ledger & ledger;
nano::store & store;
nano::network_params & network_params;
nano::logger_mt & logger;
private:
void upgrade_impl (nano::raw_key const & prv, nano::epoch epoch, uint64_t count_limit, uint64_t threads);
std::atomic<bool> stopped{ false };
nano::locked<std::future<void>> epoch_upgrading;
};
}

View file

@ -2290,7 +2290,7 @@ void nano::json_handler::epoch_upgrade ()
{
if (nano::pub_key (prv) == node.ledger.epoch_signer (node.ledger.epoch_link (epoch)))
{
if (!node.epoch_upgrader (prv, epoch, count_limit, threads))
if (!node.epoch_upgrader.start (prv, epoch, count_limit, threads))
{
response_l.put ("started", "1");
}

View file

@ -201,6 +201,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
wallets (wallets_store.init_error (), *this),
backlog{ nano::backlog_population_config (config), store, stats },
websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger },
epoch_upgrader{ *this, ledger, store, network_params, logger },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq)
{
@ -730,11 +731,7 @@ void nano::node::stop ()
checker.stop ();
wallets.stop ();
stats.stop ();
auto epoch_upgrade = epoch_upgrading.lock ();
if (epoch_upgrade->valid ())
{
epoch_upgrade->wait ();
}
epoch_upgrader.stop ();
workers.stop ();
// work pool is not stopped on purpose due to testing setup
}
@ -1430,21 +1427,6 @@ bool nano::node::init_error () const
return store.init_error () || wallets_store.init_error ();
}
bool nano::node::epoch_upgrader (nano::raw_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
{
bool error = stopped.load ();
if (!error)
{
auto epoch_upgrade = epoch_upgrading.lock ();
error = epoch_upgrade->valid () && epoch_upgrade->wait_for (std::chrono::seconds (0)) == std::future_status::timeout;
if (!error)
{
*epoch_upgrade = std::async (std::launch::async, &nano::node::epoch_upgrader_impl, this, prv_a, epoch_a, count_limit, threads);
}
}
return error;
}
void nano::node::set_bandwidth_params (std::size_t limit, double ratio)
{
config.bandwidth_limit_burst_ratio = ratio;
@ -1453,265 +1435,6 @@ void nano::node::set_bandwidth_params (std::size_t limit, double ratio)
logger.always_log (boost::str (boost::format ("set_bandwidth_params(%1%, %2%)") % limit % ratio));
}
void nano::node::epoch_upgrader_impl (nano::raw_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
{
nano::thread_role::set (nano::thread_role::name::epoch_upgrader);
auto upgrader_process = [] (nano::node & node_a, std::atomic<uint64_t> & counter, std::shared_ptr<nano::block> const & epoch, uint64_t difficulty, nano::public_key const & signer_a, nano::root const & root_a, nano::account const & account_a) {
epoch->block_work_set (node_a.work_generate_blocking (nano::work_version::work_1, root_a, difficulty).value_or (0));
bool valid_signature (!nano::validate_message (signer_a, epoch->hash (), epoch->block_signature ()));
bool valid_work (node_a.network_params.work.difficulty (*epoch) >= difficulty);
nano::process_result result (nano::process_result::old);
if (valid_signature && valid_work)
{
result = node_a.process_local (epoch).code;
}
if (result == nano::process_result::progress)
{
++counter;
}
else
{
bool fork (result == nano::process_result::fork);
node_a.logger.always_log (boost::str (boost::format ("Failed to upgrade account %1%. Valid signature: %2%. Valid work: %3%. Block processor fork: %4%") % account_a.to_account () % valid_signature % valid_work % fork));
}
};
uint64_t const upgrade_batch_size = 1000;
nano::block_builder builder;
auto link (ledger.epoch_link (epoch_a));
nano::raw_key raw_key;
raw_key = prv_a;
auto signer (nano::pub_key (prv_a));
debug_assert (signer == ledger.epoch_signer (link));
nano::mutex upgrader_mutex;
nano::condition_variable upgrader_condition;
class account_upgrade_item final
{
public:
nano::account account{};
uint64_t modified{ 0 };
};
class account_tag
{
};
class modified_tag
{
};
// clang-format off
boost::multi_index_container<account_upgrade_item,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::tag<modified_tag>,
boost::multi_index::member<account_upgrade_item, uint64_t, &account_upgrade_item::modified>,
std::greater<uint64_t>>,
boost::multi_index::hashed_unique<boost::multi_index::tag<account_tag>,
boost::multi_index::member<account_upgrade_item, nano::account, &account_upgrade_item::account>>>>
accounts_list;
// clang-format on
bool finished_upgrade (false);
while (!finished_upgrade && !stopped)
{
bool finished_accounts (false);
uint64_t total_upgraded_accounts (0);
while (!finished_accounts && count_limit != 0 && !stopped)
{
{
auto transaction (store.tx_begin_read ());
// Collect accounts to upgrade
for (auto i (store.account.begin (transaction)), n (store.account.end ()); i != n && accounts_list.size () < count_limit; ++i)
{
nano::account const & account (i->first);
nano::account_info const & info (i->second);
if (info.epoch () < epoch_a)
{
release_assert (nano::epochs::is_sequential (info.epoch (), epoch_a));
accounts_list.emplace (account_upgrade_item{ account, info.modified });
}
}
}
/* Upgrade accounts
Repeat until accounts with previous epoch exist in latest table */
std::atomic<uint64_t> upgraded_accounts (0);
uint64_t workers (0);
uint64_t attempts (0);
for (auto i (accounts_list.get<modified_tag> ().begin ()), n (accounts_list.get<modified_tag> ().end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped; ++i)
{
auto transaction (store.tx_begin_read ());
nano::account_info info;
nano::account const & account (i->account);
if (!store.account.get (transaction, account, info) && info.epoch () < epoch_a)
{
++attempts;
auto difficulty (network_params.work.threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
nano::root const & root (info.head);
std::shared_ptr<nano::block> epoch = builder.state ()
.account (account)
.previous (info.head)
.representative (info.representative)
.balance (info.balance)
.link (link)
.sign (raw_key, signer)
.work (0)
.build ();
if (threads != 0)
{
{
nano::unique_lock<nano::mutex> lock{ upgrader_mutex };
++workers;
while (workers > threads)
{
upgrader_condition.wait (lock);
}
}
this->workers.push_task ([node_l = shared_from_this (), &upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account] () {
upgrader_process (*node_l, upgraded_accounts, epoch, difficulty, signer, root, account);
{
nano::lock_guard<nano::mutex> lock{ upgrader_mutex };
--workers;
}
upgrader_condition.notify_all ();
});
}
else
{
upgrader_process (*this, upgraded_accounts, epoch, difficulty, signer, root, account);
}
}
}
{
nano::unique_lock<nano::mutex> lock{ upgrader_mutex };
while (workers > 0)
{
upgrader_condition.wait (lock);
}
}
total_upgraded_accounts += upgraded_accounts;
count_limit -= upgraded_accounts;
if (!accounts_list.empty ())
{
logger.always_log (boost::str (boost::format ("%1% accounts were upgraded to new epoch, %2% remain...") % total_upgraded_accounts % (accounts_list.size () - upgraded_accounts)));
accounts_list.clear ();
}
else
{
logger.always_log (boost::str (boost::format ("%1% total accounts were upgraded to new epoch") % total_upgraded_accounts));
finished_accounts = true;
}
}
// Pending blocks upgrade
bool finished_pending (false);
uint64_t total_upgraded_pending (0);
while (!finished_pending && count_limit != 0 && !stopped)
{
std::atomic<uint64_t> upgraded_pending (0);
uint64_t workers (0);
uint64_t attempts (0);
auto transaction (store.tx_begin_read ());
for (auto i (store.pending.begin (transaction, nano::pending_key (1, 0))), n (store.pending.end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped;)
{
bool to_next_account (false);
nano::pending_key const & key (i->first);
if (!store.account.exists (transaction, key.account))
{
nano::pending_info const & info (i->second);
if (info.epoch < epoch_a)
{
++attempts;
release_assert (nano::epochs::is_sequential (info.epoch, epoch_a));
auto difficulty (network_params.work.threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
nano::root const & root (key.account);
nano::account const & account (key.account);
std::shared_ptr<nano::block> epoch = builder.state ()
.account (key.account)
.previous (0)
.representative (0)
.balance (0)
.link (link)
.sign (raw_key, signer)
.work (0)
.build ();
if (threads != 0)
{
{
nano::unique_lock<nano::mutex> lock{ upgrader_mutex };
++workers;
while (workers > threads)
{
upgrader_condition.wait (lock);
}
}
this->workers.push_task ([node_l = shared_from_this (), &upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account] () {
upgrader_process (*node_l, upgraded_pending, epoch, difficulty, signer, root, account);
{
nano::lock_guard<nano::mutex> lock{ upgrader_mutex };
--workers;
}
upgrader_condition.notify_all ();
});
}
else
{
upgrader_process (*this, upgraded_pending, epoch, difficulty, signer, root, account);
}
}
}
else
{
to_next_account = true;
}
if (to_next_account)
{
// Move to next account if pending account exists or was upgraded
if (key.account.number () == std::numeric_limits<nano::uint256_t>::max ())
{
break;
}
else
{
i = store.pending.begin (transaction, nano::pending_key (key.account.number () + 1, 0));
}
}
else
{
// Move to next pending item
++i;
}
}
{
nano::unique_lock<nano::mutex> lock{ upgrader_mutex };
while (workers > 0)
{
upgrader_condition.wait (lock);
}
}
total_upgraded_pending += upgraded_pending;
count_limit -= upgraded_pending;
// Repeat if some pending accounts were upgraded
if (upgraded_pending != 0)
{
logger.always_log (boost::str (boost::format ("%1% unopened accounts with pending blocks were upgraded to new epoch...") % total_upgraded_pending));
}
else
{
logger.always_log (boost::str (boost::format ("%1% total unopened accounts with pending blocks were upgraded to new epoch") % total_upgraded_pending));
finished_pending = true;
}
}
finished_upgrade = (total_upgraded_accounts == 0) && (total_upgraded_pending == 0);
}
logger.always_log ("Epoch upgrade is completed");
}
std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> nano::node::get_bootstrap_weights () const
{
std::unordered_map<nano::account, nano::uint128_t> weights;

View file

@ -15,6 +15,7 @@
#include <nano/node/distributed_work_factory.hpp>
#include <nano/node/election.hpp>
#include <nano/node/election_scheduler.hpp>
#include <nano/node/epoch_upgrader.hpp>
#include <nano/node/gap_cache.hpp>
#include <nano/node/hinted_scheduler.hpp>
#include <nano/node/network.hpp>
@ -126,7 +127,6 @@ public:
void ongoing_online_weight_calculation_queue ();
bool online () const;
bool init_error () const;
bool epoch_upgrader (nano::raw_key const &, nano::epoch, uint64_t, uint64_t);
void set_bandwidth_params (std::size_t limit, double ratio);
std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> get_bootstrap_weights () const;
uint64_t get_confirmation_height (nano::transaction const &, nano::account &);
@ -185,6 +185,7 @@ public:
nano::wallets wallets;
nano::backlog_population backlog;
nano::websocket_server websocket;
nano::epoch_upgrader epoch_upgrader;
std::chrono::steady_clock::time_point const startup_time;
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week
@ -213,8 +214,6 @@ public: // Testing convenience functions
private:
void long_inactivity_cleanup ();
void epoch_upgrader_impl (nano::raw_key const &, nano::epoch, uint64_t, uint64_t);
nano::locked<std::future<void>> epoch_upgrading;
};
nano::keypair load_or_create_node_id (boost::filesystem::path const & application_path, nano::logger_mt & logger);

View file

@ -1913,9 +1913,9 @@ TEST (node, mass_epoch_upgrader)
{
auto const pre_upgrade = node.ledger.cache.block_count.load ();
auto upgrade_count = std::min<size_t> (batch_size, block_count_before + total_to_upgrade - pre_upgrade);
ASSERT_FALSE (node.epoch_upgrader (epoch_signer.prv, nano::epoch::epoch_1, upgrade_count, threads));
ASSERT_FALSE (node.epoch_upgrader.start (epoch_signer.prv, nano::epoch::epoch_1, upgrade_count, threads));
// Already ongoing - should fail
ASSERT_TRUE (node.epoch_upgrader (epoch_signer.prv, nano::epoch::epoch_1, upgrade_count, threads));
ASSERT_TRUE (node.epoch_upgrader.start (epoch_signer.prv, nano::epoch::epoch_1, upgrade_count, threads));
system.deadline_set (60s);
while (node.ledger.cache.block_count != pre_upgrade + upgrade_count)
{