Cache votes in memory to decrease number of IO writes.

This commit is contained in:
clemahieu 2017-07-30 14:59:41 -05:00
commit ae4c82dd70
3 changed files with 74 additions and 25 deletions

View file

@ -1508,6 +1508,7 @@ size_t rai::block_counts::sum ()
}
rai::block_store::block_store (bool & error_a, boost::filesystem::path const & path_a) :
sequence_cache_count (0),
environment (error_a, path_a),
frontiers (0),
accounts (0),
@ -2480,40 +2481,67 @@ void rai::block_store::checksum_del (MDB_txn * transaction_a, uint64_t prefix, u
auto status (mdb_del (transaction_a, checksum, rai::mdb_val (sizeof (key), &key), nullptr));
assert (status == 0);
}
void rai::block_store::sequence_flush (MDB_txn * transaction_a)
{
for (auto i (sequence_cache.begin ()), n (sequence_cache.end ()); i != n; ++i)
{
auto status1 (mdb_put (transaction_a, sequence, i->first.val (), rai::mdb_val (sizeof (i->second), &i->second), 0));
assert (status1 == 0);
}
sequence_cache_count = 0;
sequence_cache.clear ();
}
uint64_t rai::block_store::sequence_current (MDB_txn * transaction_a, rai::account const & account_a)
{
uint64_t result (0);
auto existing (sequence_cache.find (account_a));
if (existing != sequence_cache.end ())
{
result = existing->second;
}
else
{
MDB_val value;
auto status (mdb_get (transaction_a, sequence, account_a.val (), &value));
assert (status == 0 || status == MDB_NOTFOUND);
if (status == 0)
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (value.mv_data), value.mv_size);
auto error (rai::read (stream, result));
assert (!error);
}
}
return result;
}
uint64_t rai::block_store::sequence_atomic_inc (MDB_txn * transaction_a, rai::account const & account_a)
{
uint64_t result (0);
MDB_val value;
auto status (mdb_get (transaction_a, sequence, account_a.val (), &value));
assert (status == 0 || status == MDB_NOTFOUND);
if (status == 0)
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (value.mv_data), value.mv_size);
auto error (rai::read (stream, result));
assert (!error);
}
auto result (sequence_current (transaction_a, account_a));
result += 1;
auto status1 (mdb_put (transaction_a, sequence, account_a.val (), rai::mdb_val (sizeof (result), &result), 0));
assert (status1 == 0);
sequence_cache [account_a] = result;
++sequence_cache_count;
if (sequence_cache_count > sequence_cache_max)
{
sequence_flush (transaction_a);
}
return result;
}
uint64_t rai::block_store::sequence_atomic_observe (MDB_txn * transaction_a, rai::account const & account_a, uint64_t sequence_a)
{
uint64_t result (0);
MDB_val value;
auto status (mdb_get (transaction_a, sequence, account_a.val (), &value));
assert (status == 0 || status == MDB_NOTFOUND);
if (status == 0)
auto current (sequence_current (transaction_a, account_a));
auto result (std::max (current, sequence_a));
if (sequence_a > current)
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (value.mv_data), value.mv_size);
auto error (rai::read (stream, result));
assert (!error);
sequence_cache [account_a] = sequence_a;
++sequence_cache_count;
if (sequence_cache_count > sequence_cache_max)
{
sequence_flush (transaction_a);
}
}
result = std::max (result, sequence_a);
auto status1 (mdb_put (transaction_a, sequence, account_a.val (), rai::mdb_val (sizeof (result), &result), 0));
assert (status1 == 0);
return result;
}

View file

@ -390,6 +390,17 @@ public:
uint64_t sequence_atomic_inc (MDB_txn *, rai::account const &);
uint64_t sequence_atomic_observe (MDB_txn *, rai::account const &, uint64_t);
uint64_t sequence_current (MDB_txn *, rai::account const &);
void sequence_flush (MDB_txn *);
std::unordered_map <rai::account, uint64_t> sequence_cache;
size_t sequence_cache_count;
static size_t const sequence_cache_max = 256;
// IO per sequence_atomic_max profiled with store.vote_load
// 1 - 1,900,000
// 16 - 235,000
// 128 - 27,000
// 256 - 14,000
// 1024 - 3,200
void version_put (MDB_txn *, int);
int version_get (MDB_txn *);

View file

@ -391,6 +391,16 @@ TEST (store, unchecked_load)
node.store.unchecked_put (transaction, i, block);
}
rai::transaction transaction (node.store.environment, nullptr, false);
auto count (node.store.unchecked_count(transaction));
std::cerr << count << std::endl;
auto count (node.store.unchecked_count (transaction));
}
TEST (store, vote_load)
{
rai::system system (24000, 1);
auto & node (*system.nodes [0]);
for (auto i (0); i < 1000000; ++i)
{
rai::transaction transaction (node.store.environment, nullptr, true);
node.store.sequence_atomic_observe (transaction, 0, i);
}
}