Reducing IO a bit during bootstrapping by caching an amount of blocks in memory before committing to disk.

This commit is contained in:
clemahieu 2017-07-26 00:45:55 -05:00
commit b374f45f82
5 changed files with 99 additions and 41 deletions

View file

@ -180,16 +180,16 @@ TEST (bootstrap, simple)
bool init (false);
rai::block_store store (init, rai::unique_path ());
ASSERT_TRUE (!init);
rai::send_block block1 (0, 1, 2, rai::keypair ().prv, 4, 5);
auto block1 (std::make_shared <rai::send_block> (0, 1, 2, rai::keypair ().prv, 4, 5));
rai::transaction transaction (store.environment, nullptr, true);
auto block2 (store.unchecked_get (transaction, block1.previous ()));
auto block2 (store.unchecked_get (transaction, block1->previous ()));
ASSERT_TRUE (block2.empty ());
store.unchecked_put (transaction, block1.previous (), block1);
auto block3 (store.unchecked_get (transaction, block1.previous ()));
store.unchecked_put (transaction, block1->previous (), block1);
auto block3 (store.unchecked_get (transaction, block1->previous ()));
ASSERT_FALSE (block3.empty ());
ASSERT_EQ (block1, *block3 [0]);
store.unchecked_del (transaction, block1.previous (), block1);
auto block4 (store.unchecked_get (transaction, block1.previous ()));
ASSERT_EQ (*block1, *block3 [0]);
store.unchecked_del (transaction, block1->previous (), *block1);
auto block4 (store.unchecked_get (transaction, block1->previous ()));
ASSERT_TRUE (block4.empty ());
}
@ -249,16 +249,17 @@ TEST (block_store, one_bootstrap)
bool init (false);
rai::block_store store (init, rai::unique_path ());
ASSERT_TRUE (!init);
rai::send_block block1 (0, 1, 2, rai::keypair ().prv, 4, 5);
auto block1 (std::make_shared <rai::send_block> (0, 1, 2, rai::keypair ().prv, 4, 5));
rai::transaction transaction (store.environment, nullptr, true);
store.unchecked_put (transaction, block1.hash (), block1);
store.unchecked_put (transaction, block1->hash (), block1);
store.unchecked_cache_flush (transaction);
auto begin (store.unchecked_begin (transaction));
auto end (store.unchecked_end ());
ASSERT_NE (end, begin);
auto hash1 (begin->first);
ASSERT_EQ (block1.hash (), hash1);
ASSERT_EQ (block1->hash (), hash1);
auto block2 (rai::deserialize_block (begin->second));
ASSERT_EQ (block1, *block2);
ASSERT_EQ (*block1, *block2);
++begin;
ASSERT_EQ (end, begin);
}
@ -771,8 +772,9 @@ TEST (block_store, upgrade_v6_v7)
rai::genesis genesis;;
genesis.initialize (transaction, store);
store.version_put (transaction, 6);
rai::send_block send1 (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0);
store.unchecked_put (transaction, send1.hash (), send1);
auto send1 (std::make_shared <rai::send_block> (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_cache_flush (transaction);
ASSERT_NE (store.unchecked_end (), store.unchecked_begin (transaction));
}
bool init (false);
@ -791,11 +793,12 @@ TEST (block_store, change_dupsort)
rai::transaction transaction (store.environment, nullptr, true);
ASSERT_EQ (0, mdb_drop (transaction, store.unchecked, 1));
ASSERT_EQ (0, mdb_dbi_open (transaction, "unchecked", MDB_CREATE, &store.unchecked));
rai::send_block send1 (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0);
rai::send_block send2 (1, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0);
ASSERT_NE (send1.hash (), send2.hash ());
store.unchecked_put (transaction, send1.hash (), send1);
store.unchecked_put (transaction, send1.hash (), send2);
auto send1 (std::make_shared <rai::send_block> (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
auto send2 (std::make_shared <rai::send_block> (1, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
ASSERT_NE (send1->hash (), send2->hash ());
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_put (transaction, send1->hash (), send2);
store.unchecked_cache_flush (transaction);
{
auto iterator1 (store.unchecked_begin (transaction));
++iterator1;
@ -804,8 +807,9 @@ TEST (block_store, change_dupsort)
ASSERT_EQ (0, mdb_drop (transaction, store.unchecked, 0));
mdb_dbi_close (store.environment, store.unchecked);
ASSERT_EQ (0, mdb_dbi_open (transaction, "unchecked", MDB_CREATE | MDB_DUPSORT, &store.unchecked));
store.unchecked_put (transaction, send1.hash (), send1);
store.unchecked_put (transaction, send1.hash (), send2);
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_put (transaction, send1->hash (), send2);
store.unchecked_cache_flush (transaction);
{
auto iterator1 (store.unchecked_begin (transaction));
++iterator1;
@ -813,8 +817,9 @@ TEST (block_store, change_dupsort)
}
ASSERT_EQ (0, mdb_drop (transaction, store.unchecked, 1));
ASSERT_EQ (0, mdb_dbi_open (transaction, "unchecked", MDB_CREATE | MDB_DUPSORT, &store.unchecked));
store.unchecked_put (transaction, send1.hash (), send1);
store.unchecked_put (transaction, send1.hash (), send2);
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_put (transaction, send1->hash (), send2);
store.unchecked_cache_flush (transaction);
{
auto iterator1 (store.unchecked_begin (transaction));
++iterator1;
@ -839,10 +844,11 @@ TEST (block_store, upgrade_v7_v8)
rai::block_store store (init, path);
ASSERT_FALSE (init);
rai::transaction transaction (store.environment, nullptr, true);
rai::send_block send1 (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0);
rai::send_block send2 (1, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0);
store.unchecked_put (transaction, send1.hash (), send1);
store.unchecked_put (transaction, send1.hash (), send2);
auto send1 (std::make_shared <rai::send_block> (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
auto send2 (std::make_shared <rai::send_block> (1, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
store.unchecked_put (transaction, send1->hash (), send1);
store.unchecked_put (transaction, send1->hash (), send2);
store.unchecked_cache_flush (transaction);
{
auto iterator1 (store.unchecked_begin (transaction));
++iterator1;

View file

@ -205,7 +205,7 @@ void rai::network::republish_block (std::shared_ptr <rai::block> block)
{
rebroadcast_reps (block);
auto hash (block->hash ());
auto list (node.peers.list_sqrt ());
auto list (node.peers.list_sqrt ());
// If we're a representative, broadcast a signed confirm, otherwise an unsigned publish
if (!confirm_block (node, list, block))
{
@ -1443,7 +1443,7 @@ rai::process_return rai::node::process_receive_one (MDB_txn * transaction_a, std
{
BOOST_LOG (log) << boost::str (boost::format ("Gap previous for: %1%") % block_a->hash ().to_string ());
}
store.unchecked_put (transaction_a, block_a->previous (), *block_a);
store.unchecked_put (transaction_a, block_a->previous (), block_a);
gap_cache.add (transaction_a, block_a);
break;
}
@ -1453,7 +1453,7 @@ rai::process_return rai::node::process_receive_one (MDB_txn * transaction_a, std
{
BOOST_LOG (log) << boost::str (boost::format ("Gap source for: %1%") % block_a->hash ().to_string ());
}
store.unchecked_put (transaction_a, block_a->source (), *block_a);
store.unchecked_put (transaction_a, block_a->source (), block_a);
gap_cache.add (transaction_a, block_a);
break;
}

View file

@ -2322,20 +2322,37 @@ void rai::block_store::unchecked_clear (MDB_txn * transaction_a)
assert (status == 0);
}
void rai::block_store::unchecked_put (MDB_txn * transaction_a, rai::block_hash const & hash_a, rai::block const & block_a)
void rai::block_store::unchecked_put (MDB_txn * transaction_a, rai::block_hash const & hash_a, std::shared_ptr <rai::block> const & block_a)
{
std::vector <uint8_t> vector;
{
rai::vectorstream stream (vector);
rai::serialize_block (stream, block_a);
}
auto status (mdb_put (transaction_a, unchecked, hash_a.val (), rai::mdb_val (vector.size (), vector.data ()), 0));
assert (status == 0);
unchecked_cache.insert (std::make_pair (hash_a, block_a));
if (unchecked_cache.size () > unchecked_cache_max)
{
unchecked_cache_flush (transaction_a);
}
}
std::vector <std::unique_ptr <rai::block>> rai::block_store::unchecked_get (MDB_txn * transaction_a, rai::block_hash const & hash_a)
void rai::block_store::unchecked_cache_flush (MDB_txn * transaction_a)
{
std::vector <std::unique_ptr <rai::block>> result;
for (auto &i: unchecked_cache)
{
std::vector <uint8_t> vector;
{
rai::vectorstream stream (vector);
rai::serialize_block (stream, *i.second);
}
auto status (mdb_put (transaction_a, unchecked, i.first.val (), rai::mdb_val (vector.size (), vector.data ()), 0));
assert (status == 0);
}
unchecked_cache.clear ();
}
std::vector <std::shared_ptr <rai::block>> rai::block_store::unchecked_get (MDB_txn * transaction_a, rai::block_hash const & hash_a)
{
std::vector <std::shared_ptr <rai::block>> result;
for (auto i (unchecked_cache.begin ()), n (unchecked_cache.end ()); i != n && i->first == hash_a; ++i)
{
result.push_back (i->second);
}
for (auto i (unchecked_begin (transaction_a, hash_a)), n (unchecked_end ()); i != n && rai::block_hash (i->first) == hash_a; i.next_dup ())
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (i->second.mv_data), i->second.mv_size);
@ -2346,6 +2363,17 @@ std::vector <std::unique_ptr <rai::block>> rai::block_store::unchecked_get (MDB_
void rai::block_store::unchecked_del (MDB_txn * transaction_a, rai::block_hash const & hash_a, rai::block const & block_a)
{
for (auto i (unchecked_cache.begin ()), n (unchecked_cache.end ()); i != n && i->first == hash_a;)
{
if (*i->second == block_a)
{
i = unchecked_cache.erase (i);
}
else
{
++i;
}
}
std::vector <uint8_t> vector;
{
rai::vectorstream stream (vector);

View file

@ -360,13 +360,22 @@ public:
rai::store_iterator representation_end ();
void unchecked_clear (MDB_txn *);
void unchecked_put (MDB_txn *, rai::block_hash const &, rai::block const &);
std::vector <std::unique_ptr <rai::block>> unchecked_get (MDB_txn *, rai::block_hash const &);
void unchecked_put (MDB_txn *, rai::block_hash const &, std::shared_ptr <rai::block> const &);
std::vector <std::shared_ptr <rai::block>> unchecked_get (MDB_txn *, rai::block_hash const &);
void unchecked_del (MDB_txn *, rai::block_hash const &, rai::block const &);
rai::store_iterator unchecked_begin (MDB_txn *);
rai::store_iterator unchecked_begin (MDB_txn *, rai::block_hash const &);
rai::store_iterator unchecked_end ();
size_t unchecked_count (MDB_txn *);
void unchecked_cache_flush (MDB_txn *);
std::unordered_multimap <rai::block_hash, std::shared_ptr <rai::block>> unchecked_cache;
static size_t const unchecked_cache_max = 256;
// IO per unchecked_cache_max profiled with store.unchecked_load
// 1 - 3,600,000
// 16 - 339,000
// 128 - 34,000
// 256 - 16,000
// 1024 - 4,327
void unsynced_put (MDB_txn *, rai::block_hash const &);
void unsynced_del (MDB_txn *, rai::block_hash const &);

View file

@ -379,3 +379,18 @@ TEST (peer_container, random_set)
auto old_ms (std::chrono::duration_cast <std::chrono::milliseconds> (current - old));
auto new_ms (std::chrono::duration_cast <std::chrono::milliseconds> (end - current));
}
TEST (store, unchecked_load)
{
rai::system system (24000, 1);
auto & node (*system.nodes [0]);
auto block (std::make_shared <rai::send_block> (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
for (auto i (0); i < 1000000; ++i)
{
rai::transaction transaction (node.store.environment, nullptr, true);
node.store.unchecked_put (transaction, i, block);
}
rai::transaction transaction (node.store.environment, nullptr, false);
auto count (node.store.unchecked_count(transaction));
std::cerr << count << std::endl;
}