Improve ledger cache initialization with multiple threads (#2876)

* Speedup ledger initialization with multiple threads

More complex approach splitting each table for up to 12 threads

* Use nano locks

* std::atomic<T>::value_type is not widely supported

* Use up to 40 threads and scale with 10x hardware threads

* Move parallel traversal methods to blockstore_partial, allowing them to be used in other locations

* Follow LMDB guidelines, a read transaction cannot be used concurrently from multiple threads (Serg review)

* Remove unused template params

* Pass action by const ref

* Add cache consistency test (Serg review)

* Mutable mutex, pass by const ref
This commit is contained in:
Guilherme Lawless 2020-09-14 10:18:34 +01:00 committed by GitHub
commit e2ec2fbf48
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 221 additions and 25 deletions

View file

@ -3283,3 +3283,104 @@ TEST (ledger, block_confirmed)
ledger.store.confirmation_height_put (transaction, nano::genesis_account, height);
ASSERT_TRUE (ledger.block_confirmed (transaction, send1->hash ()));
}
TEST (ledger, cache)
{
nano::logger_mt logger;
auto store = nano::make_store (logger, nano::unique_path ());
ASSERT_TRUE (!store->init_error ());
nano::stat stats;
nano::ledger ledger (*store, stats);
nano::genesis genesis;
store->initialize (store->tx_begin_write (), genesis, ledger.cache);
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
nano::block_builder builder;
size_t const total = 100;
// Check existing ledger (incremental cache update) and reload on a new ledger
for (size_t i (0); i < total; ++i)
{
auto account_count = 1 + i;
auto block_count = 1 + 2 * (i + 1) - 2;
auto cemented_count = 1 + 2 * (i + 1) - 2;
auto genesis_weight = nano::genesis_amount - i;
auto cache_check = [&, i](nano::ledger_cache const & cache_a) {
ASSERT_EQ (account_count, cache_a.account_count);
ASSERT_EQ (block_count, cache_a.block_count);
ASSERT_EQ (cemented_count, cache_a.cemented_count);
ASSERT_EQ (genesis_weight, cache_a.rep_weights.representation_get (nano::genesis_account));
};
nano::keypair key;
auto const latest = ledger.latest (store->tx_begin_read (), nano::genesis_account);
auto send = builder.state ()
.account (nano::genesis_account)
.previous (latest)
.representative (nano::genesis_account)
.balance (nano::genesis_amount - (i + 1))
.link (key.pub)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*pool.generate (latest))
.build ();
auto open = builder.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (1)
.link (send->hash ())
.sign (key.prv, key.pub)
.work (*pool.generate (key.pub))
.build ();
{
auto transaction (store->tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send).code);
}
++block_count;
--genesis_weight;
cache_check (ledger.cache);
cache_check (nano::ledger (*store, stats).cache);
{
auto transaction (store->tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *open).code);
}
++block_count;
++account_count;
cache_check (ledger.cache);
cache_check (nano::ledger (*store, stats).cache);
{
auto transaction (store->tx_begin_write ());
nano::confirmation_height_info height;
ASSERT_FALSE (ledger.store.confirmation_height_get (transaction, nano::genesis_account, height));
++height.height;
height.frontier = send->hash ();
ledger.store.confirmation_height_put (transaction, nano::genesis_account, height);
ASSERT_TRUE (ledger.block_confirmed (transaction, send->hash ()));
++ledger.cache.cemented_count;
}
++cemented_count;
cache_check (ledger.cache);
cache_check (nano::ledger (*store, stats).cache);
{
auto transaction (store->tx_begin_write ());
nano::confirmation_height_info height;
ledger.store.confirmation_height_get (transaction, key.pub, height);
height.height += 1;
height.frontier = open->hash ();
ledger.store.confirmation_height_put (transaction, key.pub, height);
ASSERT_TRUE (ledger.block_confirmed (transaction, open->hash ()));
++ledger.cache.cemented_count;
}
++cemented_count;
cache_check (ledger.cache);
cache_check (nano::ledger (*store, stats).cache);
}
}

View file

@ -30,19 +30,30 @@ void nano::rep_weights::representation_put (nano::account const & account_a, nan
put (account_a, representation_a);
}
nano::uint128_t nano::rep_weights::representation_get (nano::account const & account_a)
nano::uint128_t nano::rep_weights::representation_get (nano::account const & account_a) const
{
nano::lock_guard<std::mutex> lk (mutex);
return get (account_a);
}
/** Makes a copy */
std::unordered_map<nano::account, nano::uint128_t> nano::rep_weights::get_rep_amounts ()
std::unordered_map<nano::account, nano::uint128_t> nano::rep_weights::get_rep_amounts () const
{
nano::lock_guard<std::mutex> guard (mutex);
return rep_amounts;
}
void nano::rep_weights::copy_from (nano::rep_weights & other_a)
{
nano::lock_guard<std::mutex> guard_this (mutex);
nano::lock_guard<std::mutex> guard_other (other_a.mutex);
for (auto const & entry : other_a.rep_amounts)
{
auto prev_amount (get (entry.first));
put (entry.first, prev_amount + entry.second);
}
}
void nano::rep_weights::put (nano::account const & account_a, nano::uint128_union const & representation_a)
{
auto it = rep_amounts.find (account_a);
@ -57,7 +68,7 @@ void nano::rep_weights::put (nano::account const & account_a, nano::uint128_unio
}
}
nano::uint128_t nano::rep_weights::get (nano::account const & account_a)
nano::uint128_t nano::rep_weights::get (nano::account const & account_a) const
{
auto it = rep_amounts.find (account_a);
if (it != rep_amounts.end ())
@ -70,7 +81,7 @@ nano::uint128_t nano::rep_weights::get (nano::account const & account_a)
}
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::rep_weights & rep_weights, const std::string & name)
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::rep_weights const & rep_weights, const std::string & name)
{
size_t rep_amounts_count;

View file

@ -17,18 +17,19 @@ class rep_weights
public:
void representation_add (nano::account const & source_rep_a, nano::uint128_t const & amount_a);
void representation_add_dual (nano::account const & source_rep_1, nano::uint128_t const & amount_1, nano::account const & source_rep_2, nano::uint128_t const & amount_2);
nano::uint128_t representation_get (nano::account const & account_a);
nano::uint128_t representation_get (nano::account const & account_a) const;
void representation_put (nano::account const & account_a, nano::uint128_union const & representation_a);
std::unordered_map<nano::account, nano::uint128_t> get_rep_amounts ();
std::unordered_map<nano::account, nano::uint128_t> get_rep_amounts () const;
void copy_from (rep_weights & other_a);
private:
std::mutex mutex;
mutable std::mutex mutex;
std::unordered_map<nano::account, nano::uint128_t> rep_amounts;
void put (nano::account const & account_a, nano::uint128_union const & representation_a);
nano::uint128_t get (nano::account const & account_a);
nano::uint128_t get (nano::account const & account_a) const;
friend std::unique_ptr<container_info_component> collect_container_info (rep_weights &, const std::string &);
friend std::unique_ptr<container_info_component> collect_container_info (rep_weights const &, const std::string &);
};
std::unique_ptr<container_info_component> collect_container_info (rep_weights &, const std::string &);
std::unique_ptr<container_info_component> collect_container_info (rep_weights const &, const std::string &);
}

View file

@ -682,6 +682,9 @@ public:
virtual nano::store_iterator<nano::block_hash, std::shared_ptr<nano::block>> blocks_begin (nano::transaction const & transaction_a) const = 0;
virtual nano::store_iterator<nano::block_hash, std::shared_ptr<nano::block>> blocks_end () const = 0;
virtual void latest_for_each_par (std::function<void(nano::store_iterator<nano::account, nano::account_info>, nano::store_iterator<nano::account, nano::account_info>)> const &) = 0;
virtual void confirmation_height_for_each_par (std::function<void(nano::store_iterator<nano::account, nano::confirmation_height_info>, nano::store_iterator<nano::account, nano::confirmation_height_info>)> const &) = 0;
virtual uint64_t block_account_height (nano::transaction const & transaction_a, nano::block_hash const & hash_a) const = 0;
virtual std::mutex & get_cache_mutex () = 0;

View file

@ -6,6 +6,14 @@
#include <crypto/cryptopp/words.h>
#include <thread>
namespace
{
template <typename T>
void parallel_traversal (std::function<void(T const &, T const &, bool const)> const & action);
}
namespace nano
{
template <typename Val, typename Derived_Store>
@ -701,6 +709,24 @@ public:
return count (transaction_a, tables::unchecked);
}
void latest_for_each_par (std::function<void(nano::store_iterator<nano::account, nano::account_info>, nano::store_iterator<nano::account, nano::account_info>)> const & action_a) override
{
parallel_traversal<nano::uint256_t> (
[&action_a, this](nano::uint256_t const & start, nano::uint256_t const & end, bool const is_last) {
auto transaction (this->tx_begin_read ());
action_a (this->latest_begin (transaction, start), !is_last ? this->latest_begin (transaction, end) : this->latest_end ());
});
}
void confirmation_height_for_each_par (std::function<void(nano::store_iterator<nano::account, nano::confirmation_height_info>, nano::store_iterator<nano::account, nano::confirmation_height_info>)> const & action_a) override
{
parallel_traversal<nano::uint256_t> (
[&action_a, this](nano::uint256_t const & start, nano::uint256_t const & end, bool const is_last) {
auto transaction (this->tx_begin_read ());
action_a (this->confirmation_height_begin (transaction, start), !is_last ? this->confirmation_height_begin (transaction, end) : this->confirmation_height_end ());
});
}
int const minimum_version{ 14 };
protected:
@ -822,3 +848,31 @@ public:
nano::block_store_partial<Val, Derived_Store> & store;
};
}
namespace
{
template <typename T>
void parallel_traversal (std::function<void(T const &, T const &, bool const)> const & action)
{
// Between 10 and 40 threads, scales well even in low power systems as long as actions are I/O bound
unsigned const thread_count = std::max (10u, std::min (40u, 10 * std::thread::hardware_concurrency ()));
T const value_max{ std::numeric_limits<T>::max () };
T const split = value_max / thread_count;
std::vector<std::thread> threads;
threads.reserve (thread_count);
for (unsigned thread (0); thread < thread_count; ++thread)
{
T const start = thread * split;
T const end = (thread + 1) * split;
bool const is_last = thread == thread_count - 1;
threads.emplace_back ([&action, start, end, is_last] {
action (start, end, is_last);
});
}
for (auto & thread : threads)
{
thread.join ();
}
}
}

View file

@ -741,28 +741,51 @@ epoch_2_started_cb (epoch_2_started_cb_a)
{
if (!store.init_error ())
{
auto transaction = store.tx_begin_read ();
if (generate_cache_a.reps || generate_cache_a.account_count || generate_cache_a.epoch_2 || generate_cache_a.block_count)
{
initialize (generate_cache_a);
}
}
void nano::ledger::initialize (nano::generate_cache const & generate_cache_a)
{
auto transaction = store.tx_begin_read ();
if (generate_cache_a.reps || generate_cache_a.account_count || generate_cache_a.epoch_2 || generate_cache_a.block_count)
{
store.latest_for_each_par (
[this](nano::store_iterator<nano::account, nano::account_info> i, nano::store_iterator<nano::account, nano::account_info> n) {
uint64_t block_count_l{ 0 };
uint64_t account_count_l{ 0 };
decltype (this->cache.rep_weights) rep_weights_l;
bool epoch_2_started_l{ false };
for (auto i (store.latest_begin (transaction)), n (store.latest_end ()); i != n; ++i)
for (; i != n; ++i)
{
nano::account_info const & info (i->second);
cache.rep_weights.representation_add (info.representative, info.balance.number ());
++cache.account_count;
cache.block_count += info.block_count;
block_count_l += info.block_count;
++account_count_l;
rep_weights_l.representation_add (info.representative, info.balance.number ());
epoch_2_started_l = epoch_2_started_l || info.epoch () == nano::epoch::epoch_2;
}
cache.epoch_2_started.store (epoch_2_started_l);
}
if (generate_cache_a.cemented_count)
{
for (auto i (store.confirmation_height_begin (transaction)), n (store.confirmation_height_end ()); i != n; ++i)
if (epoch_2_started_l)
{
cache.cemented_count += i->second.height;
this->cache.epoch_2_started.store (true);
}
}
this->cache.block_count += block_count_l;
this->cache.account_count += account_count_l;
this->cache.rep_weights.copy_from (rep_weights_l);
});
}
if (generate_cache_a.cemented_count)
{
store.confirmation_height_for_each_par (
[this](nano::store_iterator<nano::account, nano::confirmation_height_info> i, nano::store_iterator<nano::account, nano::confirmation_height_info> n) {
uint64_t cemented_count_l (0);
for (; i != n; ++i)
{
cemented_count_l += i->second.height;
}
this->cache.cemented_count += cemented_count_l;
});
}
}

View file

@ -68,6 +68,9 @@ public:
uint64_t bootstrap_weight_max_blocks{ 1 };
std::atomic<bool> check_bootstrap_weights;
std::function<void()> epoch_2_started_cb;
private:
void initialize (nano::generate_cache const &);
};
std::unique_ptr<container_info_component> collect_container_info (ledger & ledger, const std::string & name);