Merge branch 'timed_uncheck_flush'

This commit is contained in:
clemahieu 2017-12-12 03:03:08 -06:00
commit cdb87051b0
6 changed files with 49 additions and 57 deletions

View file

@ -269,7 +269,7 @@ TEST (block_store, one_bootstrap)
auto block1 (std::make_shared <rai::send_block> (0, 1, 2, rai::keypair ().prv, 4, 5));
rai::transaction transaction (store.environment, nullptr, true);
store.unchecked_put (transaction, block1->hash (), block1);
store.unchecked_cache_flush (transaction);
store.flush (transaction);
auto begin (store.unchecked_begin (transaction));
auto end (store.unchecked_end ());
ASSERT_NE (end, begin);
@ -801,7 +801,7 @@ TEST (block_store, upgrade_v6_v7)
store.version_put (transaction, 6);
auto send1 (std::make_shared <rai::send_block> (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_cache_flush (transaction);
store.flush (transaction);
ASSERT_NE (store.unchecked_end (), store.unchecked_begin (transaction));
}
bool init (false);
@ -825,7 +825,7 @@ TEST (block_store, change_dupsort)
ASSERT_NE (send1->hash (), send2->hash ());
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_put (transaction, send1->hash (), send2);
store.unchecked_cache_flush (transaction);
store.flush (transaction);
{
auto iterator1 (store.unchecked_begin (transaction));
++iterator1;
@ -836,7 +836,7 @@ TEST (block_store, change_dupsort)
ASSERT_EQ (0, mdb_dbi_open (transaction, "unchecked", MDB_CREATE | MDB_DUPSORT, &store.unchecked));
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_put (transaction, send1->hash (), send2);
store.unchecked_cache_flush (transaction);
store.flush (transaction);
{
auto iterator1 (store.unchecked_begin (transaction));
++iterator1;
@ -846,7 +846,7 @@ TEST (block_store, change_dupsort)
ASSERT_EQ (0, mdb_dbi_open (transaction, "unchecked", MDB_CREATE | MDB_DUPSORT, &store.unchecked));
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_put (transaction, send1->hash (), send2);
store.unchecked_cache_flush (transaction);
store.flush (transaction);
{
auto iterator1 (store.unchecked_begin (transaction));
++iterator1;
@ -875,7 +875,7 @@ TEST (block_store, upgrade_v7_v8)
auto send2 (std::make_shared <rai::send_block> (1, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_put (transaction, send1->hash (), send2);
store.unchecked_cache_flush (transaction);
store.flush (transaction);
{
auto iterator1 (store.unchecked_begin (transaction));
++iterator1;
@ -897,7 +897,7 @@ TEST (block_store, sequence_flush)
auto vote1 (store.vote_generate (transaction, key1.pub, key1.prv, send1));
auto seq2 (store.vote_get (transaction, vote1->account));
ASSERT_EQ (nullptr, seq2);
store.vote_flush (transaction);
store.flush (transaction);
auto seq3 (store.vote_get (transaction, vote1->account));
ASSERT_EQ (*seq3, *vote1);
}

View file

@ -1406,7 +1406,7 @@ TEST (node, vote_replay)
}
{
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
std::lock_guard <std::mutex> lock (system.nodes [0]->store.vote_mutex);
std::lock_guard <std::mutex> lock (system.nodes [0]->store.cache_mutex);
auto vote (system.nodes [0]->store.vote_current (transaction, rai::test_genesis_key.pub));
ASSERT_EQ (nullptr, vote);
}
@ -1419,7 +1419,7 @@ TEST (node, vote_replay)
{
system.poll ();
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
std::lock_guard <std::mutex> lock (system.nodes [0]->store.vote_mutex);
std::lock_guard <std::mutex> lock (system.nodes [0]->store.cache_mutex);
auto vote (system.nodes [0]->store.vote_current (transaction, rai::test_genesis_key.pub));
done = vote && (vote->sequence >= 10000);
++iterations;

View file

@ -1753,7 +1753,7 @@ void rai::node::start ()
network.receive ();
ongoing_keepalive ();
ongoing_bootstrap ();
ongoing_vote_flush ();
ongoing_store_flush ();
ongoing_rep_crawl ();
bootstrap.start ();
backup_wallet ();
@ -1894,18 +1894,18 @@ void rai::node::ongoing_bootstrap ()
});
}
void rai::node::ongoing_vote_flush ()
void rai::node::ongoing_store_flush ()
{
{
rai::transaction transaction (store.environment, nullptr, true);
store.vote_flush (transaction);
store.flush (transaction);
}
std::weak_ptr <rai::node> node_w (shared_from_this ());
alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [node_w] ()
{
if (auto node_l = node_w.lock ())
{
node_l->ongoing_vote_flush ();
node_l->ongoing_store_flush ();
}
});
}

View file

@ -483,7 +483,7 @@ public:
void ongoing_keepalive ();
void ongoing_rep_crawl ();
void ongoing_bootstrap ();
void ongoing_vote_flush ();
void ongoing_store_flush ();
void backup_wallet ();
int price (rai::uint128_t const &, int);
void generate_work (rai::block &);

View file

@ -2488,34 +2488,19 @@ void rai::block_store::unchecked_clear (MDB_txn * transaction_a)
void rai::block_store::unchecked_put (MDB_txn * transaction_a, rai::block_hash const & hash_a, std::shared_ptr <rai::block> const & block_a)
{
std::lock_guard <std::mutex> lock (cache_mutex);
unchecked_cache.insert (std::make_pair (hash_a, block_a));
if (unchecked_cache.size () > unchecked_cache_max)
{
unchecked_cache_flush (transaction_a);
}
}
void rai::block_store::unchecked_cache_flush (MDB_txn * transaction_a)
{
for (auto &i: unchecked_cache)
{
std::vector <uint8_t> vector;
{
rai::vectorstream stream (vector);
rai::serialize_block (stream, *i.second);
}
auto status (mdb_put (transaction_a, unchecked, i.first.val (), rai::mdb_val (vector.size (), vector.data ()), 0));
assert (status == 0);
}
unchecked_cache.clear ();
}
std::vector <std::shared_ptr <rai::block>> rai::block_store::unchecked_get (MDB_txn * transaction_a, rai::block_hash const & hash_a)
{
std::vector <std::shared_ptr <rai::block>> result;
for (auto i (unchecked_cache.find (hash_a)), n (unchecked_cache.end ()); i != n && i->first == hash_a; ++i)
{
result.push_back (i->second);
std::lock_guard <std::mutex> lock (cache_mutex);
for (auto i (unchecked_cache.find (hash_a)), n (unchecked_cache.end ()); i != n && i->first == hash_a; ++i)
{
result.push_back (i->second);
}
}
for (auto i (unchecked_begin (transaction_a, hash_a)), n (unchecked_end ()); i != n && rai::block_hash (i->first) == hash_a; i.next_dup ())
{
@ -2527,15 +2512,18 @@ std::vector <std::shared_ptr <rai::block>> rai::block_store::unchecked_get (MDB_
void rai::block_store::unchecked_del (MDB_txn * transaction_a, rai::block_hash const & hash_a, rai::block const & block_a)
{
for (auto i (unchecked_cache.find (hash_a)), n (unchecked_cache.end ()); i != n && i->first == hash_a;)
{
if (*i->second == block_a)
std::lock_guard <std::mutex> lock (cache_mutex);
for (auto i (unchecked_cache.find (hash_a)), n (unchecked_cache.end ()); i != n && i->first == hash_a;)
{
i = unchecked_cache.erase (i);
}
else
{
++i;
if (*i->second == block_a)
{
i = unchecked_cache.erase (i);
}
else
{
++i;
}
}
}
std::vector <uint8_t> vector;
@ -2645,12 +2633,24 @@ void rai::block_store::checksum_del (MDB_txn * transaction_a, uint64_t prefix, u
assert (status == 0);
}
void rai::block_store::vote_flush (MDB_txn * transaction_a)
void rai::block_store::flush (MDB_txn * transaction_a)
{
std::unordered_map <rai::account, std::shared_ptr <rai::vote>> sequence_cache_l;
std::unordered_multimap <rai::block_hash, std::shared_ptr <rai::block>> unchecked_cache_l;
{
std::lock_guard <std::mutex> lock (vote_mutex);
std::lock_guard <std::mutex> lock (cache_mutex);
sequence_cache_l.swap (vote_cache);
unchecked_cache_l.swap (unchecked_cache);
}
for (auto &i: unchecked_cache_l)
{
std::vector <uint8_t> vector;
{
rai::vectorstream stream (vector);
rai::serialize_block (stream, *i.second);
}
auto status (mdb_put (transaction_a, unchecked, i.first.val (), rai::mdb_val (vector.size (), vector.data ()), 0));
assert (status == 0);
}
for (auto i (sequence_cache_l.begin ()), n (sequence_cache_l.end ()); i != n; ++i)
{
@ -2690,7 +2690,7 @@ std::shared_ptr <rai::vote> rai::block_store::vote_get (MDB_txn * transaction_a,
std::shared_ptr <rai::vote> rai::block_store::vote_current (MDB_txn * transaction_a, rai::account const & account_a)
{
assert (!vote_mutex.try_lock ());
assert (!cache_mutex.try_lock ());
std::shared_ptr <rai::vote> result;
auto existing (vote_cache.find (account_a));
if (existing != vote_cache.end ())
@ -2709,7 +2709,7 @@ std::shared_ptr <rai::vote> rai::block_store::vote_generate (MDB_txn * transacti
std::shared_ptr <rai::vote> result;
uint64_t sequence = 0;
{
std::lock_guard <std::mutex> lock (vote_mutex);
std::lock_guard <std::mutex> lock (cache_mutex);
result = vote_current (transaction_a, account_a);
sequence = (result ? result->sequence : 0) + 1;
}
@ -2720,7 +2720,7 @@ std::shared_ptr <rai::vote> rai::block_store::vote_generate (MDB_txn * transacti
std::shared_ptr <rai::vote> rai::block_store::vote_max (MDB_txn * transaction_a, std::shared_ptr <rai::vote> vote_a)
{
std::lock_guard <std::mutex> lock (vote_mutex);
std::lock_guard <std::mutex> lock (cache_mutex);
auto current (vote_current (transaction_a, vote_a->account));
auto result (vote_a);
if (current != nullptr)

View file

@ -425,15 +425,7 @@ public:
rai::store_iterator unchecked_begin (MDB_txn *, rai::block_hash const &);
rai::store_iterator unchecked_end ();
size_t unchecked_count (MDB_txn *);
void unchecked_cache_flush (MDB_txn *);
std::unordered_multimap <rai::block_hash, std::shared_ptr <rai::block>> unchecked_cache;
static size_t const unchecked_cache_max = 256;
// IO per unchecked_cache_max profiled with store.unchecked_load
// 1 - 3,600,000
// 16 - 339,000
// 128 - 34,000
// 256 - 16,000
// 1024 - 4,327
void unsynced_put (MDB_txn *, rai::block_hash const &);
void unsynced_del (MDB_txn *, rai::block_hash const &);
@ -455,10 +447,10 @@ public:
std::shared_ptr <rai::vote> vote_max (MDB_txn *, std::shared_ptr <rai::vote>);
// Return latest vote for an account considering the vote cache
std::shared_ptr <rai::vote> vote_current (MDB_txn *, rai::account const &);
void vote_flush (MDB_txn *);
void flush (MDB_txn *);
rai::store_iterator vote_begin (MDB_txn *);
rai::store_iterator vote_end ();
std::mutex vote_mutex;
std::mutex cache_mutex;
std::unordered_map <rai::account, std::shared_ptr <rai::vote>> vote_cache;
void version_put (MDB_txn *, int);