diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index bdba68e5..1772f8d0 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -66,6 +66,8 @@ add_library( election.cpp election_scheduler.hpp election_scheduler.cpp + epoch_upgrader.hpp + epoch_upgrader.cpp gap_cache.hpp gap_cache.cpp hinted_scheduler.hpp diff --git a/nano/node/epoch_upgrader.cpp b/nano/node/epoch_upgrader.cpp new file mode 100644 index 00000000..c1e28f29 --- /dev/null +++ b/nano/node/epoch_upgrader.cpp @@ -0,0 +1,300 @@ +#include +#include +#include + +nano::epoch_upgrader::epoch_upgrader (nano::node & node_a, nano::ledger & ledger_a, nano::store & store_a, nano::network_params & network_params_a, nano::logger_mt & logger_a) : + node{ node_a }, + ledger{ ledger_a }, + store{ store_a }, + network_params{ network_params_a }, + logger{ logger_a } +{ +} + +void nano::epoch_upgrader::stop () +{ + stopped = true; + + auto epoch_upgrade = epoch_upgrading.lock (); + if (epoch_upgrade->valid ()) + { + epoch_upgrade->wait (); + } +} + +bool nano::epoch_upgrader::start (nano::raw_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads) +{ + bool error = stopped.load (); + if (!error) + { + auto epoch_upgrade = epoch_upgrading.lock (); + error = epoch_upgrade->valid () && epoch_upgrade->wait_for (std::chrono::seconds (0)) == std::future_status::timeout; + if (!error) + { + *epoch_upgrade = std::async (std::launch::async, [this, prv_a, epoch_a, count_limit, threads] () { + upgrade_impl (prv_a, epoch_a, count_limit, threads); + }); + } + } + return error; +} + +// TODO: This method should be a class +void nano::epoch_upgrader::upgrade_impl (nano::raw_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads) +{ + nano::thread_role::set (nano::thread_role::name::epoch_upgrader); + auto upgrader_process = [this] (std::atomic & counter, std::shared_ptr const & epoch, uint64_t difficulty, nano::public_key const & signer_a, nano::root const & root_a, nano::account const & account_a) { + epoch->block_work_set (node.work_generate_blocking (nano::work_version::work_1, root_a, difficulty).value_or (0)); + bool valid_signature (!nano::validate_message (signer_a, epoch->hash (), epoch->block_signature ())); + bool valid_work (node.network_params.work.difficulty (*epoch) >= difficulty); + nano::process_result result (nano::process_result::old); + if (valid_signature && valid_work) + { + result = node.process_local (epoch).code; + } + if (result == nano::process_result::progress) + { + ++counter; + } + else + { + bool fork (result == nano::process_result::fork); + logger.always_log (boost::str (boost::format ("Failed to upgrade account %1%. Valid signature: %2%. Valid work: %3%. Block processor fork: %4%") % account_a.to_account () % valid_signature % valid_work % fork)); + } + }; + + uint64_t const upgrade_batch_size = 1000; + nano::block_builder builder; + auto link (ledger.epoch_link (epoch_a)); + nano::raw_key raw_key; + raw_key = prv_a; + auto signer (nano::pub_key (prv_a)); + debug_assert (signer == ledger.epoch_signer (link)); + + nano::mutex upgrader_mutex; + nano::condition_variable upgrader_condition; + + class account_upgrade_item final + { + public: + nano::account account{}; + uint64_t modified{ 0 }; + }; + class account_tag + { + }; + class modified_tag + { + }; + // clang-format off + boost::multi_index_container, + boost::multi_index::member, + std::greater>, + boost::multi_index::hashed_unique, + boost::multi_index::member>>> + accounts_list; + // clang-format on + + bool finished_upgrade (false); + + while (!finished_upgrade && !stopped) + { + bool finished_accounts (false); + uint64_t total_upgraded_accounts (0); + while (!finished_accounts && count_limit != 0 && !stopped) + { + { + auto transaction (store.tx_begin_read ()); + // Collect accounts to upgrade + for (auto i (store.account.begin (transaction)), n (store.account.end ()); i != n && accounts_list.size () < count_limit; ++i) + { + nano::account const & account (i->first); + nano::account_info const & info (i->second); + if (info.epoch () < epoch_a) + { + release_assert (nano::epochs::is_sequential (info.epoch (), epoch_a)); + accounts_list.emplace (account_upgrade_item{ account, info.modified }); + } + } + } + + /* Upgrade accounts + Repeat until accounts with previous epoch exist in latest table */ + std::atomic upgraded_accounts (0); + uint64_t workers (0); + uint64_t attempts (0); + for (auto i (accounts_list.get ().begin ()), n (accounts_list.get ().end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped; ++i) + { + auto transaction (store.tx_begin_read ()); + nano::account_info info; + nano::account const & account (i->account); + if (!store.account.get (transaction, account, info) && info.epoch () < epoch_a) + { + ++attempts; + auto difficulty (node.network_params.work.threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true))); + nano::root const & root (info.head); + std::shared_ptr epoch = builder.state () + .account (account) + .previous (info.head) + .representative (info.representative) + .balance (info.balance) + .link (link) + .sign (raw_key, signer) + .work (0) + .build (); + if (threads != 0) + { + { + nano::unique_lock lock{ upgrader_mutex }; + ++workers; + while (workers > threads) + { + upgrader_condition.wait (lock); + } + } + node.workers.push_task ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account] () { + upgrader_process (upgraded_accounts, epoch, difficulty, signer, root, account); + { + nano::lock_guard lock{ upgrader_mutex }; + --workers; + } + upgrader_condition.notify_all (); + }); + } + else + { + upgrader_process (upgraded_accounts, epoch, difficulty, signer, root, account); + } + } + } + { + nano::unique_lock lock{ upgrader_mutex }; + while (workers > 0) + { + upgrader_condition.wait (lock); + } + } + total_upgraded_accounts += upgraded_accounts; + count_limit -= upgraded_accounts; + + if (!accounts_list.empty ()) + { + logger.always_log (boost::str (boost::format ("%1% accounts were upgraded to new epoch, %2% remain...") % total_upgraded_accounts % (accounts_list.size () - upgraded_accounts))); + accounts_list.clear (); + } + else + { + logger.always_log (boost::str (boost::format ("%1% total accounts were upgraded to new epoch") % total_upgraded_accounts)); + finished_accounts = true; + } + } + + // Pending blocks upgrade + bool finished_pending (false); + uint64_t total_upgraded_pending (0); + while (!finished_pending && count_limit != 0 && !stopped) + { + std::atomic upgraded_pending (0); + uint64_t workers (0); + uint64_t attempts (0); + auto transaction (store.tx_begin_read ()); + for (auto i (store.pending.begin (transaction, nano::pending_key (1, 0))), n (store.pending.end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped;) + { + bool to_next_account (false); + nano::pending_key const & key (i->first); + if (!store.account.exists (transaction, key.account)) + { + nano::pending_info const & info (i->second); + if (info.epoch < epoch_a) + { + ++attempts; + release_assert (nano::epochs::is_sequential (info.epoch, epoch_a)); + auto difficulty (network_params.work.threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true))); + nano::root const & root (key.account); + nano::account const & account (key.account); + std::shared_ptr epoch = builder.state () + .account (key.account) + .previous (0) + .representative (0) + .balance (0) + .link (link) + .sign (raw_key, signer) + .work (0) + .build (); + if (threads != 0) + { + { + nano::unique_lock lock{ upgrader_mutex }; + ++workers; + while (workers > threads) + { + upgrader_condition.wait (lock); + } + } + node.workers.push_task ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account] () { + upgrader_process (upgraded_pending, epoch, difficulty, signer, root, account); + { + nano::lock_guard lock{ upgrader_mutex }; + --workers; + } + upgrader_condition.notify_all (); + }); + } + else + { + upgrader_process (upgraded_pending, epoch, difficulty, signer, root, account); + } + } + } + else + { + to_next_account = true; + } + if (to_next_account) + { + // Move to next account if pending account exists or was upgraded + if (key.account.number () == std::numeric_limits::max ()) + { + break; + } + else + { + i = store.pending.begin (transaction, nano::pending_key (key.account.number () + 1, 0)); + } + } + else + { + // Move to next pending item + ++i; + } + } + { + nano::unique_lock lock{ upgrader_mutex }; + while (workers > 0) + { + upgrader_condition.wait (lock); + } + } + + total_upgraded_pending += upgraded_pending; + count_limit -= upgraded_pending; + + // Repeat if some pending accounts were upgraded + if (upgraded_pending != 0) + { + logger.always_log (boost::str (boost::format ("%1% unopened accounts with pending blocks were upgraded to new epoch...") % total_upgraded_pending)); + } + else + { + logger.always_log (boost::str (boost::format ("%1% total unopened accounts with pending blocks were upgraded to new epoch") % total_upgraded_pending)); + finished_pending = true; + } + } + + finished_upgrade = (total_upgraded_accounts == 0) && (total_upgraded_pending == 0); + } + + logger.always_log ("Epoch upgrade is completed"); +} diff --git a/nano/node/epoch_upgrader.hpp b/nano/node/epoch_upgrader.hpp new file mode 100644 index 00000000..fb16b4d5 --- /dev/null +++ b/nano/node/epoch_upgrader.hpp @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include + +#include + +namespace nano +{ +class node; +class ledger; +class store; +class network_params; +class logger_mt; + +class epoch_upgrader final +{ +public: + epoch_upgrader (nano::node &, nano::ledger &, nano::store &, nano::network_params &, nano::logger_mt &); + + bool start (nano::raw_key const & prv, nano::epoch epoch, uint64_t count_limit, uint64_t threads); + void stop (); + +private: // Dependencies + nano::node & node; + nano::ledger & ledger; + nano::store & store; + nano::network_params & network_params; + nano::logger_mt & logger; + +private: + void upgrade_impl (nano::raw_key const & prv, nano::epoch epoch, uint64_t count_limit, uint64_t threads); + + std::atomic stopped{ false }; + nano::locked> epoch_upgrading; +}; +} \ No newline at end of file diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 22416552..3e1da1c6 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -2290,7 +2290,7 @@ void nano::json_handler::epoch_upgrade () { if (nano::pub_key (prv) == node.ledger.epoch_signer (node.ledger.epoch_link (epoch))) { - if (!node.epoch_upgrader (prv, epoch, count_limit, threads)) + if (!node.epoch_upgrader.start (prv, epoch, count_limit, threads)) { response_l.put ("started", "1"); } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index c8a10273..138cd5c8 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -201,6 +201,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co wallets (wallets_store.init_error (), *this), backlog{ nano::backlog_population_config (config), store, stats }, websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, + epoch_upgrader{ *this, ledger, store, network_params, logger }, startup_time (std::chrono::steady_clock::now ()), node_seq (seq) { @@ -730,11 +731,7 @@ void nano::node::stop () checker.stop (); wallets.stop (); stats.stop (); - auto epoch_upgrade = epoch_upgrading.lock (); - if (epoch_upgrade->valid ()) - { - epoch_upgrade->wait (); - } + epoch_upgrader.stop (); workers.stop (); // work pool is not stopped on purpose due to testing setup } @@ -1430,21 +1427,6 @@ bool nano::node::init_error () const return store.init_error () || wallets_store.init_error (); } -bool nano::node::epoch_upgrader (nano::raw_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads) -{ - bool error = stopped.load (); - if (!error) - { - auto epoch_upgrade = epoch_upgrading.lock (); - error = epoch_upgrade->valid () && epoch_upgrade->wait_for (std::chrono::seconds (0)) == std::future_status::timeout; - if (!error) - { - *epoch_upgrade = std::async (std::launch::async, &nano::node::epoch_upgrader_impl, this, prv_a, epoch_a, count_limit, threads); - } - } - return error; -} - void nano::node::set_bandwidth_params (std::size_t limit, double ratio) { config.bandwidth_limit_burst_ratio = ratio; @@ -1453,265 +1435,6 @@ void nano::node::set_bandwidth_params (std::size_t limit, double ratio) logger.always_log (boost::str (boost::format ("set_bandwidth_params(%1%, %2%)") % limit % ratio)); } -void nano::node::epoch_upgrader_impl (nano::raw_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads) -{ - nano::thread_role::set (nano::thread_role::name::epoch_upgrader); - auto upgrader_process = [] (nano::node & node_a, std::atomic & counter, std::shared_ptr const & epoch, uint64_t difficulty, nano::public_key const & signer_a, nano::root const & root_a, nano::account const & account_a) { - epoch->block_work_set (node_a.work_generate_blocking (nano::work_version::work_1, root_a, difficulty).value_or (0)); - bool valid_signature (!nano::validate_message (signer_a, epoch->hash (), epoch->block_signature ())); - bool valid_work (node_a.network_params.work.difficulty (*epoch) >= difficulty); - nano::process_result result (nano::process_result::old); - if (valid_signature && valid_work) - { - result = node_a.process_local (epoch).code; - } - if (result == nano::process_result::progress) - { - ++counter; - } - else - { - bool fork (result == nano::process_result::fork); - node_a.logger.always_log (boost::str (boost::format ("Failed to upgrade account %1%. Valid signature: %2%. Valid work: %3%. Block processor fork: %4%") % account_a.to_account () % valid_signature % valid_work % fork)); - } - }; - - uint64_t const upgrade_batch_size = 1000; - nano::block_builder builder; - auto link (ledger.epoch_link (epoch_a)); - nano::raw_key raw_key; - raw_key = prv_a; - auto signer (nano::pub_key (prv_a)); - debug_assert (signer == ledger.epoch_signer (link)); - - nano::mutex upgrader_mutex; - nano::condition_variable upgrader_condition; - - class account_upgrade_item final - { - public: - nano::account account{}; - uint64_t modified{ 0 }; - }; - class account_tag - { - }; - class modified_tag - { - }; - // clang-format off - boost::multi_index_container, - boost::multi_index::member, - std::greater>, - boost::multi_index::hashed_unique, - boost::multi_index::member>>> - accounts_list; - // clang-format on - - bool finished_upgrade (false); - - while (!finished_upgrade && !stopped) - { - bool finished_accounts (false); - uint64_t total_upgraded_accounts (0); - while (!finished_accounts && count_limit != 0 && !stopped) - { - { - auto transaction (store.tx_begin_read ()); - // Collect accounts to upgrade - for (auto i (store.account.begin (transaction)), n (store.account.end ()); i != n && accounts_list.size () < count_limit; ++i) - { - nano::account const & account (i->first); - nano::account_info const & info (i->second); - if (info.epoch () < epoch_a) - { - release_assert (nano::epochs::is_sequential (info.epoch (), epoch_a)); - accounts_list.emplace (account_upgrade_item{ account, info.modified }); - } - } - } - - /* Upgrade accounts - Repeat until accounts with previous epoch exist in latest table */ - std::atomic upgraded_accounts (0); - uint64_t workers (0); - uint64_t attempts (0); - for (auto i (accounts_list.get ().begin ()), n (accounts_list.get ().end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped; ++i) - { - auto transaction (store.tx_begin_read ()); - nano::account_info info; - nano::account const & account (i->account); - if (!store.account.get (transaction, account, info) && info.epoch () < epoch_a) - { - ++attempts; - auto difficulty (network_params.work.threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true))); - nano::root const & root (info.head); - std::shared_ptr epoch = builder.state () - .account (account) - .previous (info.head) - .representative (info.representative) - .balance (info.balance) - .link (link) - .sign (raw_key, signer) - .work (0) - .build (); - if (threads != 0) - { - { - nano::unique_lock lock{ upgrader_mutex }; - ++workers; - while (workers > threads) - { - upgrader_condition.wait (lock); - } - } - this->workers.push_task ([node_l = shared_from_this (), &upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account] () { - upgrader_process (*node_l, upgraded_accounts, epoch, difficulty, signer, root, account); - { - nano::lock_guard lock{ upgrader_mutex }; - --workers; - } - upgrader_condition.notify_all (); - }); - } - else - { - upgrader_process (*this, upgraded_accounts, epoch, difficulty, signer, root, account); - } - } - } - { - nano::unique_lock lock{ upgrader_mutex }; - while (workers > 0) - { - upgrader_condition.wait (lock); - } - } - total_upgraded_accounts += upgraded_accounts; - count_limit -= upgraded_accounts; - - if (!accounts_list.empty ()) - { - logger.always_log (boost::str (boost::format ("%1% accounts were upgraded to new epoch, %2% remain...") % total_upgraded_accounts % (accounts_list.size () - upgraded_accounts))); - accounts_list.clear (); - } - else - { - logger.always_log (boost::str (boost::format ("%1% total accounts were upgraded to new epoch") % total_upgraded_accounts)); - finished_accounts = true; - } - } - - // Pending blocks upgrade - bool finished_pending (false); - uint64_t total_upgraded_pending (0); - while (!finished_pending && count_limit != 0 && !stopped) - { - std::atomic upgraded_pending (0); - uint64_t workers (0); - uint64_t attempts (0); - auto transaction (store.tx_begin_read ()); - for (auto i (store.pending.begin (transaction, nano::pending_key (1, 0))), n (store.pending.end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped;) - { - bool to_next_account (false); - nano::pending_key const & key (i->first); - if (!store.account.exists (transaction, key.account)) - { - nano::pending_info const & info (i->second); - if (info.epoch < epoch_a) - { - ++attempts; - release_assert (nano::epochs::is_sequential (info.epoch, epoch_a)); - auto difficulty (network_params.work.threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true))); - nano::root const & root (key.account); - nano::account const & account (key.account); - std::shared_ptr epoch = builder.state () - .account (key.account) - .previous (0) - .representative (0) - .balance (0) - .link (link) - .sign (raw_key, signer) - .work (0) - .build (); - if (threads != 0) - { - { - nano::unique_lock lock{ upgrader_mutex }; - ++workers; - while (workers > threads) - { - upgrader_condition.wait (lock); - } - } - this->workers.push_task ([node_l = shared_from_this (), &upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account] () { - upgrader_process (*node_l, upgraded_pending, epoch, difficulty, signer, root, account); - { - nano::lock_guard lock{ upgrader_mutex }; - --workers; - } - upgrader_condition.notify_all (); - }); - } - else - { - upgrader_process (*this, upgraded_pending, epoch, difficulty, signer, root, account); - } - } - } - else - { - to_next_account = true; - } - if (to_next_account) - { - // Move to next account if pending account exists or was upgraded - if (key.account.number () == std::numeric_limits::max ()) - { - break; - } - else - { - i = store.pending.begin (transaction, nano::pending_key (key.account.number () + 1, 0)); - } - } - else - { - // Move to next pending item - ++i; - } - } - { - nano::unique_lock lock{ upgrader_mutex }; - while (workers > 0) - { - upgrader_condition.wait (lock); - } - } - - total_upgraded_pending += upgraded_pending; - count_limit -= upgraded_pending; - - // Repeat if some pending accounts were upgraded - if (upgraded_pending != 0) - { - logger.always_log (boost::str (boost::format ("%1% unopened accounts with pending blocks were upgraded to new epoch...") % total_upgraded_pending)); - } - else - { - logger.always_log (boost::str (boost::format ("%1% total unopened accounts with pending blocks were upgraded to new epoch") % total_upgraded_pending)); - finished_pending = true; - } - } - - finished_upgrade = (total_upgraded_accounts == 0) && (total_upgraded_pending == 0); - } - - logger.always_log ("Epoch upgrade is completed"); -} - std::pair nano::node::get_bootstrap_weights () const { std::unordered_map weights; diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 63dbb8f5..d91b375d 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -126,7 +127,6 @@ public: void ongoing_online_weight_calculation_queue (); bool online () const; bool init_error () const; - bool epoch_upgrader (nano::raw_key const &, nano::epoch, uint64_t, uint64_t); void set_bandwidth_params (std::size_t limit, double ratio); std::pair get_bootstrap_weights () const; uint64_t get_confirmation_height (nano::transaction const &, nano::account &); @@ -185,6 +185,7 @@ public: nano::wallets wallets; nano::backlog_population backlog; nano::websocket_server websocket; + nano::epoch_upgrader epoch_upgrader; std::chrono::steady_clock::time_point const startup_time; std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week @@ -213,8 +214,6 @@ public: // Testing convenience functions private: void long_inactivity_cleanup (); - void epoch_upgrader_impl (nano::raw_key const &, nano::epoch, uint64_t, uint64_t); - nano::locked> epoch_upgrading; }; nano::keypair load_or_create_node_id (boost::filesystem::path const & application_path, nano::logger_mt & logger); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index d6fb854f..0ae95217 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1913,9 +1913,9 @@ TEST (node, mass_epoch_upgrader) { auto const pre_upgrade = node.ledger.cache.block_count.load (); auto upgrade_count = std::min (batch_size, block_count_before + total_to_upgrade - pre_upgrade); - ASSERT_FALSE (node.epoch_upgrader (epoch_signer.prv, nano::epoch::epoch_1, upgrade_count, threads)); + ASSERT_FALSE (node.epoch_upgrader.start (epoch_signer.prv, nano::epoch::epoch_1, upgrade_count, threads)); // Already ongoing - should fail - ASSERT_TRUE (node.epoch_upgrader (epoch_signer.prv, nano::epoch::epoch_1, upgrade_count, threads)); + ASSERT_TRUE (node.epoch_upgrader.start (epoch_signer.prv, nano::epoch::epoch_1, upgrade_count, threads)); system.deadline_set (60s); while (node.ledger.cache.block_count != pre_upgrade + upgrade_count) {