Merge pull request #4714 from pwojcikdev/batched-ledger-upgrade
Perform ledger upgrade in batches
This commit is contained in:
commit
aad0b4ba9a
15 changed files with 229 additions and 98 deletions
|
@ -3592,7 +3592,7 @@ TEST (node, pruning_automatic)
|
|||
ASSERT_TRUE (nano::test::block_or_pruned_all_exists (node1, { nano::dev::genesis, send1, send2 }));
|
||||
}
|
||||
|
||||
TEST (node, pruning_age)
|
||||
TEST (node, DISABLED_pruning_age)
|
||||
{
|
||||
nano::test::system system{};
|
||||
|
||||
|
@ -3653,7 +3653,7 @@ TEST (node, pruning_age)
|
|||
|
||||
// Test that a node configured with `enable_pruning` will
|
||||
// prune DEEP-enough confirmed blocks by explicitly saying `node.ledger_pruning` in the unit test
|
||||
TEST (node, pruning_depth)
|
||||
TEST (node, DISABLED_pruning_depth)
|
||||
{
|
||||
nano::test::system system{};
|
||||
|
||||
|
|
|
@ -95,21 +95,26 @@ void nano::backlog_population::populate_backlog (nano::unique_lock<nano::mutex>
|
|||
{
|
||||
auto transaction = ledger.tx_begin_read ();
|
||||
|
||||
auto count = 0u;
|
||||
auto i = ledger.store.account.begin (transaction, next);
|
||||
auto it = ledger.store.account.begin (transaction, next);
|
||||
auto const end = ledger.store.account.end ();
|
||||
for (; i != end && count < chunk_size; ++i, ++count, ++total)
|
||||
{
|
||||
transaction.refresh_if_needed ();
|
||||
|
||||
auto should_refresh = [&transaction] () {
|
||||
auto cutoff = std::chrono::steady_clock::now () - 100ms; // TODO: Make this configurable
|
||||
return transaction.timestamp () < cutoff;
|
||||
};
|
||||
|
||||
for (size_t count = 0; it != end && count < chunk_size && !should_refresh (); ++it, ++count, ++total)
|
||||
{
|
||||
stats.inc (nano::stat::type::backlog, nano::stat::detail::total);
|
||||
|
||||
auto const & account = i->first;
|
||||
auto const & account_info = i->second;
|
||||
auto const & account = it->first;
|
||||
auto const & account_info = it->second;
|
||||
|
||||
activate (transaction, account, account_info);
|
||||
|
||||
next = account.number () + 1;
|
||||
}
|
||||
|
||||
done = ledger.store.account.begin (transaction, next) == end;
|
||||
}
|
||||
|
||||
|
|
|
@ -891,10 +891,13 @@ void nano::node::ongoing_bootstrap ()
|
|||
{
|
||||
// Find last online weight sample (last active time for node)
|
||||
uint64_t last_sample_time (0);
|
||||
auto last_record = store.online_weight.rbegin (store.tx_begin_read ());
|
||||
if (last_record != store.online_weight.end ())
|
||||
{
|
||||
last_sample_time = last_record->first;
|
||||
auto transaction = store.tx_begin_read ();
|
||||
auto last_record = store.online_weight.rbegin (transaction);
|
||||
if (last_record != store.online_weight.end ())
|
||||
{
|
||||
last_sample_time = last_record->first;
|
||||
}
|
||||
}
|
||||
uint64_t time_since_last_sample = std::chrono::duration_cast<std::chrono::seconds> (std::chrono::system_clock::now ().time_since_epoch ()).count () - static_cast<uint64_t> (last_sample_time / std::pow (10, 9)); // Nanoseconds to seconds
|
||||
if (time_since_last_sample + 60 * 60 < std::numeric_limits<uint32_t>::max ())
|
||||
|
@ -975,7 +978,7 @@ bool nano::node::collect_ledger_pruning_targets (std::deque<nano::block_hash> &
|
|||
{
|
||||
uint64_t read_operations (0);
|
||||
bool finish_transaction (false);
|
||||
auto const transaction = ledger.tx_begin_read ();
|
||||
auto transaction = ledger.tx_begin_read ();
|
||||
for (auto i (store.confirmation_height.begin (transaction, last_account_a)), n (store.confirmation_height.end ()); i != n && !finish_transaction;)
|
||||
{
|
||||
++read_operations;
|
||||
|
@ -1003,6 +1006,7 @@ bool nano::node::collect_ledger_pruning_targets (std::deque<nano::block_hash> &
|
|||
}
|
||||
if (++depth % batch_read_size_a == 0)
|
||||
{
|
||||
// FIXME: This is triggering an assertion where the iterator is still used after transaction is refreshed
|
||||
transaction.refresh ();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -68,7 +68,7 @@ bool nano::scheduler::hinted::predicate () const
|
|||
return active.vacancy (nano::election_behavior::hinted) > 0;
|
||||
}
|
||||
|
||||
void nano::scheduler::hinted::activate (secure::read_transaction const & transaction, nano::block_hash const & hash, bool check_dependents)
|
||||
void nano::scheduler::hinted::activate (secure::read_transaction & transaction, nano::block_hash const & hash, bool check_dependents)
|
||||
{
|
||||
const int max_iterations = 64;
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ private:
|
|||
bool predicate () const;
|
||||
void run ();
|
||||
void run_iterative ();
|
||||
void activate (secure::read_transaction const &, nano::block_hash const & hash, bool check_dependents);
|
||||
void activate (secure::read_transaction &, nano::block_hash const & hash, bool check_dependents);
|
||||
|
||||
nano::uint128_t tally_threshold () const;
|
||||
nano::uint128_t final_tally_threshold () const;
|
||||
|
|
|
@ -79,6 +79,11 @@ public:
|
|||
return false;
|
||||
}
|
||||
|
||||
auto timestamp () const
|
||||
{
|
||||
return txn.timestamp ();
|
||||
}
|
||||
|
||||
// Conversion operator to const nano::store::transaction&
|
||||
operator const nano::store::transaction & () const override
|
||||
{
|
||||
|
@ -108,16 +113,21 @@ public:
|
|||
return txn;
|
||||
}
|
||||
|
||||
void refresh () const
|
||||
void refresh ()
|
||||
{
|
||||
txn.refresh ();
|
||||
}
|
||||
|
||||
void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) const
|
||||
void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 })
|
||||
{
|
||||
txn.refresh_if_needed (max_age);
|
||||
}
|
||||
|
||||
auto timestamp () const
|
||||
{
|
||||
return txn.timestamp ();
|
||||
}
|
||||
|
||||
// Conversion operator to const nano::store::transaction&
|
||||
operator const nano::store::transaction & () const override
|
||||
{
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/lib/utility.hpp>
|
||||
#include <nano/store/transaction.hpp>
|
||||
|
||||
#include <utility>
|
||||
|
||||
namespace nano::store
|
||||
|
@ -8,7 +11,16 @@ template <typename T, typename U>
|
|||
class iterator_impl
|
||||
{
|
||||
public:
|
||||
virtual ~iterator_impl () = default;
|
||||
explicit iterator_impl (nano::store::transaction const & transaction_a) :
|
||||
transaction{ transaction_a },
|
||||
transaction_epoch{ transaction_a.epoch () }
|
||||
{
|
||||
}
|
||||
virtual ~iterator_impl ()
|
||||
{
|
||||
debug_assert (transaction_epoch == transaction.epoch (), "invalid iterator-transaction lifetime detected");
|
||||
}
|
||||
|
||||
virtual iterator_impl<T, U> & operator++ () = 0;
|
||||
virtual iterator_impl<T, U> & operator-- () = 0;
|
||||
virtual bool operator== (iterator_impl<T, U> const & other_a) const = 0;
|
||||
|
@ -23,5 +35,9 @@ public:
|
|||
{
|
||||
return !(*this == other_a);
|
||||
}
|
||||
|
||||
protected:
|
||||
nano::store::transaction const & transaction;
|
||||
nano::store::transaction::epoch_t const transaction_epoch;
|
||||
};
|
||||
} // namespace nano::store
|
||||
}
|
||||
|
|
|
@ -14,7 +14,8 @@ template <typename T, typename U>
|
|||
class iterator : public iterator_impl<T, U>
|
||||
{
|
||||
public:
|
||||
iterator (store::transaction const & transaction_a, env const & env_a, MDB_dbi db_a, MDB_val const & val_a = MDB_val{}, bool const direction_asc = true)
|
||||
iterator (store::transaction const & transaction_a, env const & env_a, MDB_dbi db_a, MDB_val const & val_a = MDB_val{}, bool const direction_asc = true) :
|
||||
nano::store::iterator_impl<T, U> (transaction_a)
|
||||
{
|
||||
auto status (mdb_cursor_open (env_a.tx (transaction_a), db_a, &cursor));
|
||||
release_assert (status == 0);
|
||||
|
|
|
@ -209,10 +209,10 @@ void nano::store::lmdb::component::open_databases (bool & error_a, store::transa
|
|||
error_a |= mdb_dbi_open (env.tx (transaction_a), "rep_weights", flags, &rep_weight_store.rep_weights_handle) != 0;
|
||||
}
|
||||
|
||||
bool nano::store::lmdb::component::do_upgrades (store::write_transaction & transaction_a, nano::ledger_constants & constants, bool & needs_vacuuming)
|
||||
bool nano::store::lmdb::component::do_upgrades (store::write_transaction & transaction, nano::ledger_constants & constants, bool & needs_vacuuming)
|
||||
{
|
||||
auto error (false);
|
||||
auto version_l = version.get (transaction_a);
|
||||
auto version_l = version.get (transaction);
|
||||
if (version_l < version_minimum)
|
||||
{
|
||||
logger.critical (nano::log::type::lmdb, "The version of the ledger ({}) is lower than the minimum ({}) which is supported for upgrades. Either upgrade a node first or delete the ledger.", version_l, version_minimum);
|
||||
|
@ -221,13 +221,13 @@ bool nano::store::lmdb::component::do_upgrades (store::write_transaction & trans
|
|||
switch (version_l)
|
||||
{
|
||||
case 21:
|
||||
upgrade_v21_to_v22 (transaction_a);
|
||||
upgrade_v21_to_v22 (transaction);
|
||||
[[fallthrough]];
|
||||
case 22:
|
||||
upgrade_v22_to_v23 (transaction_a);
|
||||
upgrade_v22_to_v23 (transaction);
|
||||
[[fallthrough]];
|
||||
case 23:
|
||||
upgrade_v23_to_v24 (transaction_a);
|
||||
upgrade_v23_to_v24 (transaction);
|
||||
[[fallthrough]];
|
||||
case 24:
|
||||
break;
|
||||
|
@ -239,59 +239,85 @@ bool nano::store::lmdb::component::do_upgrades (store::write_transaction & trans
|
|||
return error;
|
||||
}
|
||||
|
||||
void nano::store::lmdb::component::upgrade_v21_to_v22 (store::write_transaction const & transaction_a)
|
||||
void nano::store::lmdb::component::upgrade_v21_to_v22 (store::write_transaction & transaction)
|
||||
{
|
||||
logger.info (nano::log::type::lmdb, "Upgrading database from v21 to v22...");
|
||||
|
||||
MDB_dbi unchecked_handle{ 0 };
|
||||
release_assert (!mdb_dbi_open (env.tx (transaction_a), "unchecked", MDB_CREATE, &unchecked_handle));
|
||||
release_assert (!mdb_drop (env.tx (transaction_a), unchecked_handle, 1)); // del = 1, to delete it from the environment and close the DB handle.
|
||||
version.put (transaction_a, 22);
|
||||
release_assert (!mdb_dbi_open (env.tx (transaction), "unchecked", MDB_CREATE, &unchecked_handle));
|
||||
release_assert (!mdb_drop (env.tx (transaction), unchecked_handle, 1)); // del = 1, to delete it from the environment and close the DB handle.
|
||||
version.put (transaction, 22);
|
||||
|
||||
logger.info (nano::log::type::lmdb, "Upgrading database from v21 to v22 completed");
|
||||
}
|
||||
|
||||
// Fill rep_weights table with all existing representatives and their vote weight
|
||||
void nano::store::lmdb::component::upgrade_v22_to_v23 (store::write_transaction const & transaction_a)
|
||||
void nano::store::lmdb::component::upgrade_v22_to_v23 (store::write_transaction & transaction)
|
||||
{
|
||||
logger.info (nano::log::type::lmdb, "Upgrading database from v22 to v23...");
|
||||
auto i{ make_iterator<nano::account, nano::account_info_v22> (transaction_a, tables::accounts) };
|
||||
auto end{ store::iterator<nano::account, nano::account_info_v22> (nullptr) };
|
||||
uint64_t processed_accounts = 0;
|
||||
for (; i != end; ++i)
|
||||
{
|
||||
if (!i->second.balance.is_zero ())
|
||||
|
||||
drop (transaction, tables::rep_weights);
|
||||
transaction.refresh ();
|
||||
|
||||
release_assert (rep_weight.begin (tx_begin_read ()) == rep_weight.end (), "rep weights table must be empty before upgrading to v23");
|
||||
|
||||
auto iterate_accounts = [this] (auto && func) {
|
||||
auto transaction = tx_begin_read ();
|
||||
|
||||
// Manually create v22 compatible iterator to read accounts
|
||||
auto it = make_iterator<nano::account, nano::account_info_v22> (transaction, tables::accounts);
|
||||
auto const end = store::iterator<nano::account, nano::account_info_v22> (nullptr);
|
||||
|
||||
for (; it != end; ++it)
|
||||
{
|
||||
auto const & account = it->first;
|
||||
auto const & account_info = it->second;
|
||||
|
||||
func (account, account_info);
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: Make this smaller in dev builds
|
||||
const size_t batch_size = 250000;
|
||||
|
||||
size_t processed = 0;
|
||||
iterate_accounts ([this, &transaction, &processed] (nano::account const & account, nano::account_info_v22 const & account_info) {
|
||||
if (!account_info.balance.is_zero ())
|
||||
{
|
||||
nano::uint128_t total{ 0 };
|
||||
nano::store::lmdb::db_val value;
|
||||
auto status = get (transaction_a, tables::rep_weights, i->second.representative, value);
|
||||
auto status = get (transaction, tables::rep_weights, account_info.representative, value);
|
||||
if (success (status))
|
||||
{
|
||||
total = nano::amount{ value }.number ();
|
||||
}
|
||||
total += i->second.balance.number ();
|
||||
status = put (transaction_a, tables::rep_weights, i->second.representative, nano::amount{ total });
|
||||
total += account_info.balance.number ();
|
||||
status = put (transaction, tables::rep_weights, account_info.representative, nano::amount{ total });
|
||||
release_assert_success (status);
|
||||
}
|
||||
processed_accounts++;
|
||||
if (processed_accounts % 250000 == 0)
|
||||
|
||||
processed++;
|
||||
if (processed % batch_size == 0)
|
||||
{
|
||||
logger.info (nano::log::type::lmdb, "Processed {} accounts", processed_accounts);
|
||||
logger.info (nano::log::type::lmdb, "Processed {} accounts", processed);
|
||||
transaction.refresh (); // Refresh to prevent excessive memory usage
|
||||
}
|
||||
}
|
||||
logger.info (nano::log::type::lmdb, "Processed {} accounts", processed_accounts);
|
||||
version.put (transaction_a, 23);
|
||||
});
|
||||
|
||||
logger.info (nano::log::type::lmdb, "Done processing {} accounts", processed);
|
||||
version.put (transaction, 23);
|
||||
|
||||
logger.info (nano::log::type::lmdb, "Upgrading database from v22 to v23 completed");
|
||||
}
|
||||
|
||||
void nano::store::lmdb::component::upgrade_v23_to_v24 (store::write_transaction const & transaction_a)
|
||||
void nano::store::lmdb::component::upgrade_v23_to_v24 (store::write_transaction & transaction)
|
||||
{
|
||||
logger.info (nano::log::type::lmdb, "Upgrading database from v23 to v24...");
|
||||
|
||||
MDB_dbi frontiers_handle{ 0 };
|
||||
release_assert (!mdb_dbi_open (env.tx (transaction_a), "frontiers", MDB_CREATE, &frontiers_handle));
|
||||
release_assert (!mdb_drop (env.tx (transaction_a), frontiers_handle, 1)); // del = 1, to delete it from the environment and close the DB handle.
|
||||
version.put (transaction_a, 24);
|
||||
release_assert (!mdb_dbi_open (env.tx (transaction), "frontiers", MDB_CREATE, &frontiers_handle));
|
||||
release_assert (!mdb_drop (env.tx (transaction), frontiers_handle, 1)); // del = 1, to delete it from the environment and close the DB handle.
|
||||
version.put (transaction, 24);
|
||||
logger.info (nano::log::type::lmdb, "Upgrading database from v23 to v24 completed");
|
||||
}
|
||||
|
||||
|
|
|
@ -112,9 +112,9 @@ public:
|
|||
|
||||
private:
|
||||
bool do_upgrades (store::write_transaction &, nano::ledger_constants & constants, bool &);
|
||||
void upgrade_v21_to_v22 (store::write_transaction const &);
|
||||
void upgrade_v22_to_v23 (store::write_transaction const &);
|
||||
void upgrade_v23_to_v24 (store::write_transaction const &);
|
||||
void upgrade_v21_to_v22 (store::write_transaction &);
|
||||
void upgrade_v22_to_v23 (store::write_transaction &);
|
||||
void upgrade_v23_to_v24 (store::write_transaction &);
|
||||
|
||||
void open_databases (bool &, store::transaction const &, unsigned);
|
||||
|
||||
|
|
|
@ -33,7 +33,8 @@ class iterator : public iterator_impl<T, U>
|
|||
public:
|
||||
iterator () = default;
|
||||
|
||||
iterator (::rocksdb::DB * db, store::transaction const & transaction_a, ::rocksdb::ColumnFamilyHandle * handle_a, db_val const * val_a, bool const direction_asc)
|
||||
iterator (::rocksdb::DB * db, store::transaction const & transaction_a, ::rocksdb::ColumnFamilyHandle * handle_a, db_val const * val_a, bool const direction_asc) :
|
||||
nano::store::iterator_impl<T, U> (transaction_a)
|
||||
{
|
||||
// Don't fill the block cache for any blocks read as a result of an iterator
|
||||
if (is_read (transaction_a))
|
||||
|
|
|
@ -210,10 +210,10 @@ void nano::store::rocksdb::component::open (bool & error_a, std::filesystem::pat
|
|||
error_a |= !s.ok ();
|
||||
}
|
||||
|
||||
bool nano::store::rocksdb::component::do_upgrades (store::write_transaction const & transaction_a)
|
||||
bool nano::store::rocksdb::component::do_upgrades (store::write_transaction & transaction)
|
||||
{
|
||||
bool error_l{ false };
|
||||
auto version_l = version.get (transaction_a);
|
||||
auto version_l = version.get (transaction);
|
||||
switch (version_l)
|
||||
{
|
||||
case 1:
|
||||
|
@ -240,13 +240,13 @@ bool nano::store::rocksdb::component::do_upgrades (store::write_transaction cons
|
|||
case 19:
|
||||
case 20:
|
||||
case 21:
|
||||
upgrade_v21_to_v22 (transaction_a);
|
||||
upgrade_v21_to_v22 (transaction);
|
||||
[[fallthrough]];
|
||||
case 22:
|
||||
upgrade_v22_to_v23 (transaction_a);
|
||||
upgrade_v22_to_v23 (transaction);
|
||||
[[fallthrough]];
|
||||
case 23:
|
||||
upgrade_v23_to_v24 (transaction_a);
|
||||
upgrade_v23_to_v24 (transaction);
|
||||
[[fallthrough]];
|
||||
case 24:
|
||||
break;
|
||||
|
@ -258,7 +258,7 @@ bool nano::store::rocksdb::component::do_upgrades (store::write_transaction cons
|
|||
return error_l;
|
||||
}
|
||||
|
||||
void nano::store::rocksdb::component::upgrade_v21_to_v22 (store::write_transaction const & transaction_a)
|
||||
void nano::store::rocksdb::component::upgrade_v21_to_v22 (store::write_transaction & transaction)
|
||||
{
|
||||
logger.info (nano::log::type::rocksdb, "Upgrading database from v21 to v22...");
|
||||
|
||||
|
@ -279,64 +279,106 @@ void nano::store::rocksdb::component::upgrade_v21_to_v22 (store::write_transacti
|
|||
logger.debug (nano::log::type::rocksdb, "Finished removing unchecked table");
|
||||
}
|
||||
|
||||
version.put (transaction_a, 22);
|
||||
version.put (transaction, 22);
|
||||
|
||||
logger.info (nano::log::type::rocksdb, "Upgrading database from v21 to v22 completed");
|
||||
}
|
||||
|
||||
// Fill rep_weights table with all existing representatives and their vote weight
|
||||
void nano::store::rocksdb::component::upgrade_v22_to_v23 (store::write_transaction const & transaction_a)
|
||||
void nano::store::rocksdb::component::upgrade_v22_to_v23 (store::write_transaction & transaction)
|
||||
{
|
||||
logger.info (nano::log::type::rocksdb, "Upgrading database from v22 to v23...");
|
||||
|
||||
if (!column_family_exists ("rep_weights"))
|
||||
if (column_family_exists ("rep_weights"))
|
||||
{
|
||||
logger.info (nano::log::type::rocksdb, "Dropping existing rep_weights table");
|
||||
auto const rep_weights_handle = get_column_family ("rep_weights");
|
||||
db->DropColumnFamily (rep_weights_handle);
|
||||
db->DestroyColumnFamilyHandle (rep_weights_handle);
|
||||
std::erase_if (handles, [rep_weights_handle] (auto & handle) {
|
||||
if (handle.get () == rep_weights_handle)
|
||||
{
|
||||
// The handle resource is deleted by RocksDB.
|
||||
[[maybe_unused]] auto ptr = handle.release ();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
transaction.refresh ();
|
||||
}
|
||||
|
||||
{
|
||||
logger.info (nano::log::type::rocksdb, "Creating table rep_weights");
|
||||
::rocksdb::ColumnFamilyOptions new_cf_options;
|
||||
::rocksdb::ColumnFamilyHandle * new_cf_handle;
|
||||
::rocksdb::Status status = db->CreateColumnFamily (new_cf_options, "rep_weights", &new_cf_handle);
|
||||
release_assert (success (status.code ()));
|
||||
handles.emplace_back (new_cf_handle);
|
||||
transaction.refresh ();
|
||||
}
|
||||
auto i{ make_iterator<nano::account, nano::account_info_v22> (transaction_a, tables::accounts) };
|
||||
auto end{ store::iterator<nano::account, nano::account_info_v22> (nullptr) };
|
||||
uint64_t processed_accounts = 0;
|
||||
for (; i != end; ++i)
|
||||
{
|
||||
if (!i->second.balance.is_zero ())
|
||||
|
||||
release_assert (rep_weight.begin (tx_begin_read ()) == rep_weight.end (), "rep weights table must be empty before upgrading to v23");
|
||||
|
||||
auto iterate_accounts = [this] (auto && func) {
|
||||
auto transaction = tx_begin_read ();
|
||||
|
||||
// Manually create v22 compatible iterator to read accounts
|
||||
auto it = make_iterator<nano::account, nano::account_info_v22> (transaction, tables::accounts);
|
||||
auto const end = store::iterator<nano::account, nano::account_info_v22> (nullptr);
|
||||
|
||||
for (; it != end; ++it)
|
||||
{
|
||||
auto const & account = it->first;
|
||||
auto const & account_info = it->second;
|
||||
|
||||
func (account, account_info);
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: Make this smaller in dev builds
|
||||
const size_t batch_size = 250000;
|
||||
|
||||
size_t processed = 0;
|
||||
iterate_accounts ([this, &transaction, &processed] (nano::account const & account, nano::account_info_v22 const & account_info) {
|
||||
if (!account_info.balance.is_zero ())
|
||||
{
|
||||
nano::uint128_t total{ 0 };
|
||||
nano::store::rocksdb::db_val value;
|
||||
auto status = get (transaction_a, tables::rep_weights, i->second.representative, value);
|
||||
auto status = get (transaction, tables::rep_weights, account_info.representative, value);
|
||||
if (success (status))
|
||||
{
|
||||
total = nano::amount{ value }.number ();
|
||||
}
|
||||
total += i->second.balance.number ();
|
||||
status = put (transaction_a, tables::rep_weights, i->second.representative, nano::amount{ total });
|
||||
total += account_info.balance.number ();
|
||||
status = put (transaction, tables::rep_weights, account_info.representative, nano::amount{ total });
|
||||
release_assert_success (status);
|
||||
}
|
||||
processed_accounts++;
|
||||
if (processed_accounts % 250000 == 0)
|
||||
|
||||
processed++;
|
||||
if (processed % batch_size == 0)
|
||||
{
|
||||
logger.info (nano::log::type::rocksdb, "Processed {} accounts", processed_accounts);
|
||||
logger.info (nano::log::type::rocksdb, "Processed {} accounts", processed);
|
||||
transaction.refresh (); // Refresh to prevent excessive memory usage
|
||||
}
|
||||
}
|
||||
logger.info (nano::log::type::rocksdb, "Processed {} accounts", processed_accounts);
|
||||
version.put (transaction_a, 23);
|
||||
});
|
||||
|
||||
logger.info (nano::log::type::rocksdb, "Done processing {} accounts", processed);
|
||||
version.put (transaction, 23);
|
||||
|
||||
logger.info (nano::log::type::rocksdb, "Upgrading database from v22 to v23 completed");
|
||||
}
|
||||
|
||||
void nano::store::rocksdb::component::upgrade_v23_to_v24 (store::write_transaction const & transaction_a)
|
||||
void nano::store::rocksdb::component::upgrade_v23_to_v24 (store::write_transaction & transaction)
|
||||
{
|
||||
logger.info (nano::log::type::rocksdb, "Upgrading database from v23 to v24...");
|
||||
|
||||
if (column_family_exists ("frontiers"))
|
||||
{
|
||||
auto const unchecked_handle = get_column_family ("frontiers");
|
||||
db->DropColumnFamily (unchecked_handle);
|
||||
db->DestroyColumnFamilyHandle (unchecked_handle);
|
||||
std::erase_if (handles, [unchecked_handle] (auto & handle) {
|
||||
if (handle.get () == unchecked_handle)
|
||||
auto const frontiers_handle = get_column_family ("frontiers");
|
||||
db->DropColumnFamily (frontiers_handle);
|
||||
db->DestroyColumnFamilyHandle (frontiers_handle);
|
||||
std::erase_if (handles, [frontiers_handle] (auto & handle) {
|
||||
if (handle.get () == frontiers_handle)
|
||||
{
|
||||
// The handle resource is deleted by RocksDB.
|
||||
[[maybe_unused]] auto ptr = handle.release ();
|
||||
|
@ -347,7 +389,7 @@ void nano::store::rocksdb::component::upgrade_v23_to_v24 (store::write_transacti
|
|||
logger.debug (nano::log::type::rocksdb, "Finished removing frontiers table");
|
||||
}
|
||||
|
||||
version.put (transaction_a, 24);
|
||||
version.put (transaction, 24);
|
||||
logger.info (nano::log::type::rocksdb, "Upgrading database from v23 to v24 completed");
|
||||
}
|
||||
|
||||
|
|
|
@ -148,10 +148,10 @@ private:
|
|||
|
||||
void open (bool & error_a, std::filesystem::path const & path_a, bool open_read_only_a, ::rocksdb::Options const & options_a, std::vector<::rocksdb::ColumnFamilyDescriptor> column_families);
|
||||
|
||||
bool do_upgrades (store::write_transaction const &);
|
||||
void upgrade_v21_to_v22 (store::write_transaction const &);
|
||||
void upgrade_v22_to_v23 (store::write_transaction const &);
|
||||
void upgrade_v23_to_v24 (store::write_transaction const &);
|
||||
bool do_upgrades (store::write_transaction &);
|
||||
void upgrade_v21_to_v22 (store::write_transaction &);
|
||||
void upgrade_v22_to_v23 (store::write_transaction &);
|
||||
void upgrade_v23_to_v24 (store::write_transaction &);
|
||||
|
||||
void construct_column_family_mutexes ();
|
||||
::rocksdb::Options get_db_options ();
|
||||
|
|
|
@ -29,6 +29,20 @@ nano::store::write_transaction_impl::write_transaction_impl (nano::id_dispenser:
|
|||
{
|
||||
}
|
||||
|
||||
/*
|
||||
* transaction
|
||||
*/
|
||||
|
||||
auto nano::store::transaction::epoch () const -> epoch_t
|
||||
{
|
||||
return current_epoch;
|
||||
}
|
||||
|
||||
std::chrono::steady_clock::time_point nano::store::transaction::timestamp () const
|
||||
{
|
||||
return start;
|
||||
}
|
||||
|
||||
/*
|
||||
* read_transaction
|
||||
*/
|
||||
|
@ -49,24 +63,26 @@ nano::id_dispenser::id_t nano::store::read_transaction::store_id () const
|
|||
return impl->store_id;
|
||||
}
|
||||
|
||||
void nano::store::read_transaction::reset () const
|
||||
void nano::store::read_transaction::reset ()
|
||||
{
|
||||
++current_epoch;
|
||||
impl->reset ();
|
||||
}
|
||||
|
||||
void nano::store::read_transaction::renew () const
|
||||
void nano::store::read_transaction::renew ()
|
||||
{
|
||||
++current_epoch;
|
||||
impl->renew ();
|
||||
start = std::chrono::steady_clock::now ();
|
||||
}
|
||||
|
||||
void nano::store::read_transaction::refresh () const
|
||||
void nano::store::read_transaction::refresh ()
|
||||
{
|
||||
reset ();
|
||||
renew ();
|
||||
}
|
||||
|
||||
void nano::store::read_transaction::refresh_if_needed (std::chrono::milliseconds max_age) const
|
||||
void nano::store::read_transaction::refresh_if_needed (std::chrono::milliseconds max_age)
|
||||
{
|
||||
auto now = std::chrono::steady_clock::now ();
|
||||
if (now - start > max_age)
|
||||
|
@ -102,11 +118,13 @@ nano::id_dispenser::id_t nano::store::write_transaction::store_id () const
|
|||
|
||||
void nano::store::write_transaction::commit ()
|
||||
{
|
||||
++current_epoch;
|
||||
impl->commit ();
|
||||
}
|
||||
|
||||
void nano::store::write_transaction::renew ()
|
||||
{
|
||||
++current_epoch;
|
||||
impl->renew ();
|
||||
start = std::chrono::steady_clock::now ();
|
||||
}
|
||||
|
|
|
@ -36,10 +36,20 @@ public:
|
|||
|
||||
class transaction
|
||||
{
|
||||
public:
|
||||
using epoch_t = size_t;
|
||||
|
||||
public:
|
||||
virtual ~transaction () = default;
|
||||
virtual void * get_handle () const = 0;
|
||||
virtual nano::id_dispenser::id_t store_id () const = 0;
|
||||
|
||||
epoch_t epoch () const;
|
||||
std::chrono::steady_clock::time_point timestamp () const;
|
||||
|
||||
protected:
|
||||
epoch_t current_epoch{ 0 };
|
||||
std::chrono::steady_clock::time_point start{};
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -53,14 +63,13 @@ public:
|
|||
void * get_handle () const override;
|
||||
nano::id_dispenser::id_t store_id () const override;
|
||||
|
||||
void reset () const;
|
||||
void renew () const;
|
||||
void refresh () const;
|
||||
void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 }) const;
|
||||
void reset ();
|
||||
void renew ();
|
||||
void refresh ();
|
||||
void refresh_if_needed (std::chrono::milliseconds max_age = std::chrono::milliseconds{ 500 });
|
||||
|
||||
private:
|
||||
std::unique_ptr<read_transaction_impl> impl;
|
||||
mutable std::chrono::steady_clock::time_point start;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -82,6 +91,5 @@ public:
|
|||
|
||||
private:
|
||||
std::unique_ptr<write_transaction_impl> impl;
|
||||
std::chrono::steady_clock::time_point start;
|
||||
};
|
||||
} // namespace nano::store
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue