From c72d0749f094ef95a8c2fa09810a55911585ef73 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Sun, 10 Dec 2017 15:47:35 -0600 Subject: [PATCH] Flushing unchecked blocks on a timed interval instead of a fixed number. --- rai/core_test/block_store.cpp | 14 ++++---- rai/core_test/node.cpp | 4 +-- rai/node/node.cpp | 8 ++--- rai/node/node.hpp | 2 +- rai/secure.cpp | 66 +++++++++++++++++------------------ rai/secure.hpp | 12 ++----- 6 files changed, 49 insertions(+), 57 deletions(-) diff --git a/rai/core_test/block_store.cpp b/rai/core_test/block_store.cpp index 00de66f2..821face7 100644 --- a/rai/core_test/block_store.cpp +++ b/rai/core_test/block_store.cpp @@ -269,7 +269,7 @@ TEST (block_store, one_bootstrap) auto block1 (std::make_shared (0, 1, 2, rai::keypair ().prv, 4, 5)); rai::transaction transaction (store.environment, nullptr, true); store.unchecked_put (transaction, block1->hash (), block1); - store.unchecked_cache_flush (transaction); + store.flush (transaction); auto begin (store.unchecked_begin (transaction)); auto end (store.unchecked_end ()); ASSERT_NE (end, begin); @@ -801,7 +801,7 @@ TEST (block_store, upgrade_v6_v7) store.version_put (transaction, 6); auto send1 (std::make_shared (0, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); store.unchecked_put (transaction, send1->hash (), send1); - store.unchecked_cache_flush (transaction); + store.flush (transaction); ASSERT_NE (store.unchecked_end (), store.unchecked_begin (transaction)); } bool init (false); @@ -825,7 +825,7 @@ TEST (block_store, change_dupsort) ASSERT_NE (send1->hash (), send2->hash ()); store.unchecked_put (transaction, send1->hash (), send1); store.unchecked_put (transaction, send1->hash (), send2); - store.unchecked_cache_flush (transaction); + store.flush (transaction); { auto iterator1 (store.unchecked_begin (transaction)); ++iterator1; @@ -836,7 +836,7 @@ TEST (block_store, change_dupsort) ASSERT_EQ (0, mdb_dbi_open (transaction, "unchecked", MDB_CREATE | MDB_DUPSORT, &store.unchecked)); store.unchecked_put (transaction, send1->hash (), send1); store.unchecked_put (transaction, send1->hash (), send2); - store.unchecked_cache_flush (transaction); + store.flush (transaction); { auto iterator1 (store.unchecked_begin (transaction)); ++iterator1; @@ -846,7 +846,7 @@ TEST (block_store, change_dupsort) ASSERT_EQ (0, mdb_dbi_open (transaction, "unchecked", MDB_CREATE | MDB_DUPSORT, &store.unchecked)); store.unchecked_put (transaction, send1->hash (), send1); store.unchecked_put (transaction, send1->hash (), send2); - store.unchecked_cache_flush (transaction); + store.flush (transaction); { auto iterator1 (store.unchecked_begin (transaction)); ++iterator1; @@ -875,7 +875,7 @@ TEST (block_store, upgrade_v7_v8) auto send2 (std::make_shared (1, 0, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); store.unchecked_put (transaction, send1->hash (), send1); store.unchecked_put (transaction, send1->hash (), send2); - store.unchecked_cache_flush (transaction); + store.flush (transaction); { auto iterator1 (store.unchecked_begin (transaction)); ++iterator1; @@ -897,7 +897,7 @@ TEST (block_store, sequence_flush) auto vote1 (store.vote_generate (transaction, key1.pub, key1.prv, send1)); auto seq2 (store.vote_get (transaction, vote1->account)); ASSERT_EQ (nullptr, seq2); - store.vote_flush (transaction); + store.flush (transaction); auto seq3 (store.vote_get (transaction, vote1->account)); ASSERT_EQ (*seq3, *vote1); } diff --git a/rai/core_test/node.cpp b/rai/core_test/node.cpp index 514e80d3..97966955 100644 --- a/rai/core_test/node.cpp +++ b/rai/core_test/node.cpp @@ -1406,7 +1406,7 @@ TEST (node, vote_replay) } { rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false); - std::lock_guard lock (system.nodes [0]->store.vote_mutex); + std::lock_guard lock (system.nodes [0]->store.cache_mutex); auto vote (system.nodes [0]->store.vote_current (transaction, rai::test_genesis_key.pub)); ASSERT_EQ (nullptr, vote); } @@ -1419,7 +1419,7 @@ TEST (node, vote_replay) { system.poll (); rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false); - std::lock_guard lock (system.nodes [0]->store.vote_mutex); + std::lock_guard lock (system.nodes [0]->store.cache_mutex); auto vote (system.nodes [0]->store.vote_current (transaction, rai::test_genesis_key.pub)); done = vote && (vote->sequence >= 10000); ++iterations; diff --git a/rai/node/node.cpp b/rai/node/node.cpp index 41ba6943..b4aa0fd7 100755 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -1753,7 +1753,7 @@ void rai::node::start () network.receive (); ongoing_keepalive (); ongoing_bootstrap (); - ongoing_vote_flush (); + ongoing_store_flush (); ongoing_rep_crawl (); bootstrap.start (); backup_wallet (); @@ -1894,18 +1894,18 @@ void rai::node::ongoing_bootstrap () }); } -void rai::node::ongoing_vote_flush () +void rai::node::ongoing_store_flush () { { rai::transaction transaction (store.environment, nullptr, true); - store.vote_flush (transaction); + store.flush (transaction); } std::weak_ptr node_w (shared_from_this ()); alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [node_w] () { if (auto node_l = node_w.lock ()) { - node_l->ongoing_vote_flush (); + node_l->ongoing_store_flush (); } }); } diff --git a/rai/node/node.hpp b/rai/node/node.hpp index f5d9efd1..1f671d21 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -483,7 +483,7 @@ public: void ongoing_keepalive (); void ongoing_rep_crawl (); void ongoing_bootstrap (); - void ongoing_vote_flush (); + void ongoing_store_flush (); void backup_wallet (); int price (rai::uint128_t const &, int); void generate_work (rai::block &); diff --git a/rai/secure.cpp b/rai/secure.cpp index e6f11812..1fa41de1 100755 --- a/rai/secure.cpp +++ b/rai/secure.cpp @@ -2488,34 +2488,19 @@ void rai::block_store::unchecked_clear (MDB_txn * transaction_a) void rai::block_store::unchecked_put (MDB_txn * transaction_a, rai::block_hash const & hash_a, std::shared_ptr const & block_a) { + std::lock_guard lock (cache_mutex); unchecked_cache.insert (std::make_pair (hash_a, block_a)); - if (unchecked_cache.size () > unchecked_cache_max) - { - unchecked_cache_flush (transaction_a); - } -} - -void rai::block_store::unchecked_cache_flush (MDB_txn * transaction_a) -{ - for (auto &i: unchecked_cache) - { - std::vector vector; - { - rai::vectorstream stream (vector); - rai::serialize_block (stream, *i.second); - } - auto status (mdb_put (transaction_a, unchecked, i.first.val (), rai::mdb_val (vector.size (), vector.data ()), 0)); - assert (status == 0); - } - unchecked_cache.clear (); } std::vector > rai::block_store::unchecked_get (MDB_txn * transaction_a, rai::block_hash const & hash_a) { std::vector > result; - for (auto i (unchecked_cache.find (hash_a)), n (unchecked_cache.end ()); i != n && i->first == hash_a; ++i) { - result.push_back (i->second); + std::lock_guard lock (cache_mutex); + for (auto i (unchecked_cache.find (hash_a)), n (unchecked_cache.end ()); i != n && i->first == hash_a; ++i) + { + result.push_back (i->second); + } } for (auto i (unchecked_begin (transaction_a, hash_a)), n (unchecked_end ()); i != n && rai::block_hash (i->first) == hash_a; i.next_dup ()) { @@ -2527,15 +2512,18 @@ std::vector > rai::block_store::unchecked_get (MDB_ void rai::block_store::unchecked_del (MDB_txn * transaction_a, rai::block_hash const & hash_a, rai::block const & block_a) { - for (auto i (unchecked_cache.find (hash_a)), n (unchecked_cache.end ()); i != n && i->first == hash_a;) { - if (*i->second == block_a) + std::lock_guard lock (cache_mutex); + for (auto i (unchecked_cache.find (hash_a)), n (unchecked_cache.end ()); i != n && i->first == hash_a;) { - i = unchecked_cache.erase (i); - } - else - { - ++i; + if (*i->second == block_a) + { + i = unchecked_cache.erase (i); + } + else + { + ++i; + } } } std::vector vector; @@ -2645,12 +2633,24 @@ void rai::block_store::checksum_del (MDB_txn * transaction_a, uint64_t prefix, u assert (status == 0); } -void rai::block_store::vote_flush (MDB_txn * transaction_a) +void rai::block_store::flush (MDB_txn * transaction_a) { std::unordered_map > sequence_cache_l; + std::unordered_multimap > unchecked_cache_l; { - std::lock_guard lock (vote_mutex); + std::lock_guard lock (cache_mutex); sequence_cache_l.swap (vote_cache); + unchecked_cache_l.swap (unchecked_cache); + } + for (auto &i: unchecked_cache_l) + { + std::vector vector; + { + rai::vectorstream stream (vector); + rai::serialize_block (stream, *i.second); + } + auto status (mdb_put (transaction_a, unchecked, i.first.val (), rai::mdb_val (vector.size (), vector.data ()), 0)); + assert (status == 0); } for (auto i (sequence_cache_l.begin ()), n (sequence_cache_l.end ()); i != n; ++i) { @@ -2690,7 +2690,7 @@ std::shared_ptr rai::block_store::vote_get (MDB_txn * transaction_a, std::shared_ptr rai::block_store::vote_current (MDB_txn * transaction_a, rai::account const & account_a) { - assert (!vote_mutex.try_lock ()); + assert (!cache_mutex.try_lock ()); std::shared_ptr result; auto existing (vote_cache.find (account_a)); if (existing != vote_cache.end ()) @@ -2709,7 +2709,7 @@ std::shared_ptr rai::block_store::vote_generate (MDB_txn * transacti std::shared_ptr result; uint64_t sequence = 0; { - std::lock_guard lock (vote_mutex); + std::lock_guard lock (cache_mutex); result = vote_current (transaction_a, account_a); sequence = (result ? result->sequence : 0) + 1; } @@ -2720,7 +2720,7 @@ std::shared_ptr rai::block_store::vote_generate (MDB_txn * transacti std::shared_ptr rai::block_store::vote_max (MDB_txn * transaction_a, std::shared_ptr vote_a) { - std::lock_guard lock (vote_mutex); + std::lock_guard lock (cache_mutex); auto current (vote_current (transaction_a, vote_a->account)); auto result (vote_a); if (current != nullptr) diff --git a/rai/secure.hpp b/rai/secure.hpp index 3bc251ac..cc378e65 100644 --- a/rai/secure.hpp +++ b/rai/secure.hpp @@ -425,15 +425,7 @@ public: rai::store_iterator unchecked_begin (MDB_txn *, rai::block_hash const &); rai::store_iterator unchecked_end (); size_t unchecked_count (MDB_txn *); - void unchecked_cache_flush (MDB_txn *); std::unordered_multimap > unchecked_cache; - static size_t const unchecked_cache_max = 256; - // IO per unchecked_cache_max profiled with store.unchecked_load - // 1 - 3,600,000 - // 16 - 339,000 - // 128 - 34,000 - // 256 - 16,000 - // 1024 - 4,327 void unsynced_put (MDB_txn *, rai::block_hash const &); void unsynced_del (MDB_txn *, rai::block_hash const &); @@ -455,10 +447,10 @@ public: std::shared_ptr vote_max (MDB_txn *, std::shared_ptr ); // Return latest vote for an account considering the vote cache std::shared_ptr vote_current (MDB_txn *, rai::account const &); - void vote_flush (MDB_txn *); + void flush (MDB_txn *); rai::store_iterator vote_begin (MDB_txn *); rai::store_iterator vote_end (); - std::mutex vote_mutex; + std::mutex cache_mutex; std::unordered_map > vote_cache; void version_put (MDB_txn *, int);