[RocksDB] Manually flush memtables if there are a large number of tombstones (#2860)
* [RocksDB] Manually flush memtable if there is a large number of tombstones * Use a map (Gui review comment) * Incrementing twice, add test, replace arrays with map (Gui comments) * Fix tests on CI * Update function name from generate to create * Assert size (gui comment)
This commit is contained in:
parent
f72893dee3
commit
f4147c0352
7 changed files with 196 additions and 71 deletions
|
@ -4,6 +4,7 @@
|
|||
#include <nano/lib/utility.hpp>
|
||||
#include <nano/lib/work.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/testing.hpp>
|
||||
#include <nano/secure/ledger.hpp>
|
||||
#include <nano/secure/utility.hpp>
|
||||
#include <nano/secure/versioning.hpp>
|
||||
|
@ -373,17 +374,17 @@ TEST (bootstrap, simple)
|
|||
TEST (unchecked, multiple)
|
||||
{
|
||||
nano::logger_mt logger;
|
||||
nano::mdb_store store (logger, nano::unique_path ());
|
||||
ASSERT_TRUE (!store.init_error ());
|
||||
auto store = nano::make_store (logger, nano::unique_path ());
|
||||
ASSERT_TRUE (!store->init_error ());
|
||||
auto block1 (std::make_shared<nano::send_block> (4, 1, 2, nano::keypair ().prv, 4, 5));
|
||||
auto transaction (store.tx_begin_write ());
|
||||
auto block2 (store.unchecked_get (transaction, block1->previous ()));
|
||||
auto transaction (store->tx_begin_write ());
|
||||
auto block2 (store->unchecked_get (transaction, block1->previous ()));
|
||||
ASSERT_TRUE (block2.empty ());
|
||||
store.unchecked_put (transaction, block1->previous (), block1);
|
||||
store.unchecked_put (transaction, block1->source (), block1);
|
||||
auto block3 (store.unchecked_get (transaction, block1->previous ()));
|
||||
store->unchecked_put (transaction, block1->previous (), block1);
|
||||
store->unchecked_put (transaction, block1->source (), block1);
|
||||
auto block3 (store->unchecked_get (transaction, block1->previous ()));
|
||||
ASSERT_FALSE (block3.empty ());
|
||||
auto block4 (store.unchecked_get (transaction, block1->source ()));
|
||||
auto block4 (store->unchecked_get (transaction, block1->source ()));
|
||||
ASSERT_FALSE (block4.empty ());
|
||||
}
|
||||
|
||||
|
@ -1747,7 +1748,7 @@ TEST (block_store, confirmation_height)
|
|||
{
|
||||
auto path (nano::unique_path ());
|
||||
nano::logger_mt logger;
|
||||
nano::mdb_store store (logger, path);
|
||||
auto store = nano::make_store (logger, path);
|
||||
|
||||
nano::account account1 (0);
|
||||
nano::account account2 (1);
|
||||
|
@ -1756,35 +1757,35 @@ TEST (block_store, confirmation_height)
|
|||
nano::block_hash cemented_frontier2 (4);
|
||||
nano::block_hash cemented_frontier3 (5);
|
||||
{
|
||||
auto transaction (store.tx_begin_write ());
|
||||
store.confirmation_height_put (transaction, account1, { 500, cemented_frontier1 });
|
||||
store.confirmation_height_put (transaction, account2, { std::numeric_limits<uint64_t>::max (), cemented_frontier2 });
|
||||
store.confirmation_height_put (transaction, account3, { 10, cemented_frontier3 });
|
||||
auto transaction (store->tx_begin_write ());
|
||||
store->confirmation_height_put (transaction, account1, { 500, cemented_frontier1 });
|
||||
store->confirmation_height_put (transaction, account2, { std::numeric_limits<uint64_t>::max (), cemented_frontier2 });
|
||||
store->confirmation_height_put (transaction, account3, { 10, cemented_frontier3 });
|
||||
|
||||
nano::confirmation_height_info confirmation_height_info;
|
||||
ASSERT_FALSE (store.confirmation_height_get (transaction, account1, confirmation_height_info));
|
||||
ASSERT_FALSE (store->confirmation_height_get (transaction, account1, confirmation_height_info));
|
||||
ASSERT_EQ (confirmation_height_info.height, 500);
|
||||
ASSERT_EQ (confirmation_height_info.frontier, cemented_frontier1);
|
||||
ASSERT_FALSE (store.confirmation_height_get (transaction, account2, confirmation_height_info));
|
||||
ASSERT_FALSE (store->confirmation_height_get (transaction, account2, confirmation_height_info));
|
||||
ASSERT_EQ (confirmation_height_info.height, std::numeric_limits<uint64_t>::max ());
|
||||
ASSERT_EQ (confirmation_height_info.frontier, cemented_frontier2);
|
||||
ASSERT_FALSE (store.confirmation_height_get (transaction, account3, confirmation_height_info));
|
||||
ASSERT_FALSE (store->confirmation_height_get (transaction, account3, confirmation_height_info));
|
||||
ASSERT_EQ (confirmation_height_info.height, 10);
|
||||
ASSERT_EQ (confirmation_height_info.frontier, cemented_frontier3);
|
||||
|
||||
// Check cleaning of confirmation heights
|
||||
store.confirmation_height_clear (transaction);
|
||||
store->confirmation_height_clear (transaction);
|
||||
}
|
||||
auto transaction (store.tx_begin_read ());
|
||||
ASSERT_EQ (store.confirmation_height_count (transaction), 3);
|
||||
auto transaction (store->tx_begin_read ());
|
||||
ASSERT_EQ (store->confirmation_height_count (transaction), 3);
|
||||
nano::confirmation_height_info confirmation_height_info;
|
||||
ASSERT_FALSE (store.confirmation_height_get (transaction, account1, confirmation_height_info));
|
||||
ASSERT_FALSE (store->confirmation_height_get (transaction, account1, confirmation_height_info));
|
||||
ASSERT_EQ (confirmation_height_info.height, 0);
|
||||
ASSERT_EQ (confirmation_height_info.frontier, nano::block_hash (0));
|
||||
ASSERT_FALSE (store.confirmation_height_get (transaction, account2, confirmation_height_info));
|
||||
ASSERT_FALSE (store->confirmation_height_get (transaction, account2, confirmation_height_info));
|
||||
ASSERT_EQ (confirmation_height_info.height, 0);
|
||||
ASSERT_EQ (confirmation_height_info.frontier, nano::block_hash (0));
|
||||
ASSERT_FALSE (store.confirmation_height_get (transaction, account3, confirmation_height_info));
|
||||
ASSERT_FALSE (store->confirmation_height_get (transaction, account3, confirmation_height_info));
|
||||
ASSERT_EQ (confirmation_height_info.height, 0);
|
||||
ASSERT_EQ (confirmation_height_info.frontier, nano::block_hash (0));
|
||||
}
|
||||
|
@ -1873,6 +1874,25 @@ TEST (block_store, rocksdb_force_test_env_variable)
|
|||
#endif
|
||||
}
|
||||
|
||||
namespace nano
|
||||
{
|
||||
TEST (rocksdb_block_store, tombstone_count)
|
||||
{
|
||||
if (nano::using_rocksdb_in_tests ())
|
||||
{
|
||||
nano::logger_mt logger;
|
||||
auto store = std::make_unique<nano::rocksdb_store> (logger, nano::unique_path ());
|
||||
ASSERT_TRUE (!store->init_error ());
|
||||
auto transaction = store->tx_begin_write ();
|
||||
auto block1 (std::make_shared<nano::send_block> (0, 1, 2, nano::keypair ().prv, 4, 5));
|
||||
store->unchecked_put (transaction, block1->previous (), block1);
|
||||
ASSERT_EQ (store->tombstone_map.at (nano::tables::unchecked).num_since_last_flush.load (), 0);
|
||||
store->unchecked_del (transaction, nano::unchecked_key (block1->previous (), block1->hash ()));
|
||||
ASSERT_EQ (store->tombstone_map.at (nano::tables::unchecked).num_since_last_flush.load (), 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
void write_sideband_v14 (nano::mdb_store & store_a, nano::transaction & transaction_a, nano::block const & block_a, MDB_dbi db_a)
|
||||
|
|
|
@ -737,18 +737,18 @@ TEST (confirmation_heightDeathTest, rollback_added_block)
|
|||
{
|
||||
nano::logger_mt logger;
|
||||
auto path (nano::unique_path ());
|
||||
nano::mdb_store store (logger, path);
|
||||
ASSERT_TRUE (!store.init_error ());
|
||||
auto store = nano::make_store (logger, path);
|
||||
ASSERT_TRUE (!store->init_error ());
|
||||
nano::genesis genesis;
|
||||
nano::stat stats;
|
||||
nano::ledger ledger (store, stats);
|
||||
nano::ledger ledger (*store, stats);
|
||||
nano::write_database_queue write_database_queue (false);
|
||||
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
|
||||
nano::keypair key1;
|
||||
auto send = std::make_shared<nano::send_block> (genesis.hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *pool.generate (genesis.hash ()));
|
||||
{
|
||||
auto transaction (store.tx_begin_write ());
|
||||
store.initialize (transaction, genesis, ledger.cache);
|
||||
auto transaction (store->tx_begin_write ());
|
||||
store->initialize (transaction, genesis, ledger.cache);
|
||||
}
|
||||
|
||||
auto block_hash_being_processed (send->hash ());
|
||||
|
@ -809,18 +809,18 @@ TEST (confirmation_heightDeathTest, modified_chain)
|
|||
{
|
||||
nano::logger_mt logger;
|
||||
auto path (nano::unique_path ());
|
||||
nano::mdb_store store (logger, path);
|
||||
ASSERT_TRUE (!store.init_error ());
|
||||
auto store = nano::make_store (logger, path);
|
||||
ASSERT_TRUE (!store->init_error ());
|
||||
nano::genesis genesis;
|
||||
nano::stat stats;
|
||||
nano::ledger ledger (store, stats);
|
||||
nano::ledger ledger (*store, stats);
|
||||
nano::write_database_queue write_database_queue (false);
|
||||
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
|
||||
nano::keypair key1;
|
||||
auto send = std::make_shared<nano::send_block> (nano::genesis_hash, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *pool.generate (nano::genesis_hash));
|
||||
{
|
||||
auto transaction (store.tx_begin_write ());
|
||||
store.initialize (transaction, genesis, ledger.cache);
|
||||
auto transaction (store->tx_begin_write ());
|
||||
store->initialize (transaction, genesis, ledger.cache);
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send).code);
|
||||
}
|
||||
|
||||
|
@ -837,14 +837,14 @@ TEST (confirmation_heightDeathTest, modified_chain)
|
|||
}
|
||||
|
||||
// Rollback the block and now try to write, the block no longer exists so should bail
|
||||
ledger.rollback (store.tx_begin_write (), send->hash ());
|
||||
ledger.rollback (store->tx_begin_write (), send->hash ());
|
||||
{
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
ASSERT_DEATH_IF_SUPPORTED (bounded_processor.cement_blocks (scoped_write_guard), "");
|
||||
}
|
||||
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (store.tx_begin_write (), *send).code);
|
||||
store.confirmation_height_put (store.tx_begin_write (), nano::genesis_account, { 1, nano::genesis_hash });
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (store->tx_begin_write (), *send).code);
|
||||
store->confirmation_height_put (store->tx_begin_write (), nano::genesis_account, { 1, nano::genesis_hash });
|
||||
|
||||
nano::confirmation_height_unbounded unbounded_processor (
|
||||
ledger, write_database_queue, 10ms, logger, stopped, block_hash_being_processed, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
|
||||
|
@ -856,7 +856,7 @@ TEST (confirmation_heightDeathTest, modified_chain)
|
|||
}
|
||||
|
||||
// Rollback the block and now try to write, the block no longer exists so should bail
|
||||
ledger.rollback (store.tx_begin_write (), send->hash ());
|
||||
ledger.rollback (store->tx_begin_write (), send->hash ());
|
||||
{
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
ASSERT_DEATH_IF_SUPPORTED (unbounded_processor.cement_blocks (scoped_write_guard), "");
|
||||
|
@ -875,19 +875,19 @@ TEST (confirmation_heightDeathTest, modified_chain_account_removed)
|
|||
{
|
||||
nano::logger_mt logger;
|
||||
auto path (nano::unique_path ());
|
||||
nano::mdb_store store (logger, path);
|
||||
ASSERT_TRUE (!store.init_error ());
|
||||
auto store = nano::make_store (logger, path);
|
||||
ASSERT_TRUE (!store->init_error ());
|
||||
nano::genesis genesis;
|
||||
nano::stat stats;
|
||||
nano::ledger ledger (store, stats);
|
||||
nano::ledger ledger (*store, stats);
|
||||
nano::write_database_queue write_database_queue (false);
|
||||
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
|
||||
nano::keypair key1;
|
||||
auto send = std::make_shared<nano::send_block> (nano::genesis_hash, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *pool.generate (nano::genesis_hash));
|
||||
auto open = std::make_shared<nano::state_block> (key1.pub, 0, 0, nano::Gxrb_ratio, send->hash (), key1.prv, key1.pub, *pool.generate (key1.pub));
|
||||
{
|
||||
auto transaction (store.tx_begin_write ());
|
||||
store.initialize (transaction, genesis, ledger.cache);
|
||||
auto transaction (store->tx_begin_write ());
|
||||
store->initialize (transaction, genesis, ledger.cache);
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send).code);
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *open).code);
|
||||
}
|
||||
|
@ -905,15 +905,15 @@ TEST (confirmation_heightDeathTest, modified_chain_account_removed)
|
|||
}
|
||||
|
||||
// Rollback the block and now try to write, the send should be cemented but the account which the open block belongs no longer exists so should bail
|
||||
ledger.rollback (store.tx_begin_write (), open->hash ());
|
||||
ledger.rollback (store->tx_begin_write (), open->hash ());
|
||||
{
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
ASSERT_DEATH_IF_SUPPORTED (unbounded_processor.cement_blocks (scoped_write_guard), "");
|
||||
}
|
||||
|
||||
// Reset conditions and test with the bounded processor
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (store.tx_begin_write (), *open).code);
|
||||
store.confirmation_height_put (store.tx_begin_write (), nano::genesis_account, { 1, nano::genesis_hash });
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (store->tx_begin_write (), *open).code);
|
||||
store->confirmation_height_put (store->tx_begin_write (), nano::genesis_account, { 1, nano::genesis_hash });
|
||||
|
||||
nano::confirmation_height_bounded bounded_processor (
|
||||
ledger, write_database_queue, 10ms, logger, stopped, block_hash_being_processed, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
|
||||
|
@ -925,7 +925,7 @@ TEST (confirmation_heightDeathTest, modified_chain_account_removed)
|
|||
}
|
||||
|
||||
// Rollback the block and now try to write, the send should be cemented but the account which the open block belongs no longer exists so should bail
|
||||
ledger.rollback (store.tx_begin_write (), open->hash ());
|
||||
ledger.rollback (store->tx_begin_write (), open->hash ());
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
ASSERT_DEATH_IF_SUPPORTED (bounded_processor.cement_blocks (scoped_write_guard), "");
|
||||
}
|
||||
|
@ -1385,11 +1385,11 @@ TEST (confirmation_height, unbounded_block_cache_iteration)
|
|||
{
|
||||
nano::logger_mt logger;
|
||||
auto path (nano::unique_path ());
|
||||
nano::mdb_store store (logger, path);
|
||||
ASSERT_TRUE (!store.init_error ());
|
||||
auto store = nano::make_store (logger, path);
|
||||
ASSERT_TRUE (!store->init_error ());
|
||||
nano::genesis genesis;
|
||||
nano::stat stats;
|
||||
nano::ledger ledger (store, stats);
|
||||
nano::ledger ledger (*store, stats);
|
||||
nano::write_database_queue write_database_queue (false);
|
||||
boost::latch initialized_latch{ 0 };
|
||||
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
|
||||
|
@ -1397,8 +1397,8 @@ TEST (confirmation_height, unbounded_block_cache_iteration)
|
|||
auto send = std::make_shared<nano::send_block> (genesis.hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *pool.generate (genesis.hash ()));
|
||||
auto send1 = std::make_shared<nano::send_block> (send->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *pool.generate (send->hash ()));
|
||||
{
|
||||
auto transaction (store.tx_begin_write ());
|
||||
store.initialize (transaction, genesis, ledger.cache);
|
||||
auto transaction (store->tx_begin_write ());
|
||||
store->initialize (transaction, genesis, ledger.cache);
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send).code);
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send1).code);
|
||||
}
|
||||
|
|
|
@ -15,6 +15,26 @@
|
|||
#include <rocksdb/utilities/transaction.h>
|
||||
#include <rocksdb/utilities/transaction_db.h>
|
||||
|
||||
namespace
|
||||
{
|
||||
class event_listener : public rocksdb::EventListener
|
||||
{
|
||||
public:
|
||||
event_listener (std::function<void(rocksdb::FlushJobInfo const &)> const & flush_completed_cb_a) :
|
||||
flush_completed_cb (flush_completed_cb_a)
|
||||
{
|
||||
}
|
||||
|
||||
void OnFlushCompleted (rocksdb::DB * /* db_a */, rocksdb::FlushJobInfo const & flush_info_a) override
|
||||
{
|
||||
flush_completed_cb (flush_info_a);
|
||||
}
|
||||
|
||||
private:
|
||||
std::function<void(rocksdb::FlushJobInfo const &)> flush_completed_cb;
|
||||
};
|
||||
}
|
||||
|
||||
namespace nano
|
||||
{
|
||||
template <>
|
||||
|
@ -44,7 +64,8 @@ void rocksdb_val::convert_buffer_to_value ()
|
|||
|
||||
nano::rocksdb_store::rocksdb_store (nano::logger_mt & logger_a, boost::filesystem::path const & path_a, nano::rocksdb_config const & rocksdb_config_a, bool open_read_only_a) :
|
||||
logger (logger_a),
|
||||
rocksdb_config (rocksdb_config_a)
|
||||
rocksdb_config (rocksdb_config_a),
|
||||
cf_name_table_map (create_cf_name_table_map ())
|
||||
{
|
||||
boost::system::error_code error_mkdir, error_chmod;
|
||||
boost::filesystem::create_directories (path_a, error_mkdir);
|
||||
|
@ -53,8 +74,8 @@ rocksdb_config (rocksdb_config_a)
|
|||
|
||||
if (!error)
|
||||
{
|
||||
auto table_options = get_table_options ();
|
||||
table_factory.reset (rocksdb::NewBlockBasedTableFactory (table_options));
|
||||
generate_tombstone_map ();
|
||||
table_factory.reset (rocksdb::NewBlockBasedTableFactory (get_table_options ()));
|
||||
if (!open_read_only_a)
|
||||
{
|
||||
construct_column_family_mutexes ();
|
||||
|
@ -63,11 +84,28 @@ rocksdb_config (rocksdb_config_a)
|
|||
}
|
||||
}
|
||||
|
||||
std::unordered_map<const char *, nano::tables> nano::rocksdb_store::create_cf_name_table_map () const
|
||||
{
|
||||
std::unordered_map<const char *, nano::tables> map{ { rocksdb::kDefaultColumnFamilyName.c_str (), tables::default_unused },
|
||||
{ "frontiers", tables::frontiers },
|
||||
{ "accounts", tables::accounts },
|
||||
{ "blocks", tables::blocks },
|
||||
{ "pending", tables::pending },
|
||||
{ "unchecked", tables::unchecked },
|
||||
{ "vote", tables::vote },
|
||||
{ "online_weight", tables::online_weight },
|
||||
{ "meta", tables::meta },
|
||||
{ "peers", tables::peers },
|
||||
{ "confirmation_height", tables::confirmation_height } };
|
||||
|
||||
debug_assert (map.size () == all_tables ().size ());
|
||||
return map;
|
||||
}
|
||||
|
||||
void nano::rocksdb_store::open (bool & error_a, boost::filesystem::path const & path_a, bool open_read_only_a)
|
||||
{
|
||||
std::initializer_list<const char *> names{ rocksdb::kDefaultColumnFamilyName.c_str (), "frontiers", "accounts", "blocks", "pending", "unchecked", "vote", "online_weight", "meta", "peers", "confirmation_height" };
|
||||
std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
|
||||
for (const auto & cf_name : names)
|
||||
for (auto & [cf_name, table] : cf_name_table_map)
|
||||
{
|
||||
column_families.emplace_back (cf_name, get_cf_options ());
|
||||
}
|
||||
|
@ -112,6 +150,14 @@ void nano::rocksdb_store::open (bool & error_a, boost::filesystem::path const &
|
|||
}
|
||||
}
|
||||
|
||||
void nano::rocksdb_store::generate_tombstone_map ()
|
||||
{
|
||||
tombstone_map.emplace (std::piecewise_construct, std::forward_as_tuple (nano::tables::unchecked), std::forward_as_tuple (0, 50000));
|
||||
tombstone_map.emplace (std::piecewise_construct, std::forward_as_tuple (nano::tables::blocks), std::forward_as_tuple (0, 25000));
|
||||
tombstone_map.emplace (std::piecewise_construct, std::forward_as_tuple (nano::tables::accounts), std::forward_as_tuple (0, 25000));
|
||||
tombstone_map.emplace (std::piecewise_construct, std::forward_as_tuple (nano::tables::pending), std::forward_as_tuple (0, 25000));
|
||||
}
|
||||
|
||||
nano::write_transaction nano::rocksdb_store::tx_begin_write (std::vector<nano::tables> const & tables_requiring_locks_a, std::vector<nano::tables> const & tables_no_locks_a)
|
||||
{
|
||||
std::unique_ptr<nano::write_rocksdb_txn> txn;
|
||||
|
@ -204,9 +250,30 @@ int nano::rocksdb_store::del (nano::write_transaction const & transaction_a, tab
|
|||
debug_assert (transaction_a.contains (table_a));
|
||||
// RocksDB does not report not_found status, it is a pre-condition that the key exists
|
||||
debug_assert (exists (transaction_a, table_a, key_a));
|
||||
flush_tombstones_check (table_a);
|
||||
return tx (transaction_a)->Delete (table_to_column_family (table_a), key_a).code ();
|
||||
}
|
||||
|
||||
void nano::rocksdb_store::flush_tombstones_check (tables table_a)
|
||||
{
|
||||
// Update the number of deletes for some tables, and force a flush if there are too many tombstones
|
||||
// as it can affect read performance.
|
||||
if (auto it = tombstone_map.find (table_a); it != tombstone_map.end ())
|
||||
{
|
||||
auto & tombstone_info = it->second;
|
||||
if (++tombstone_info.num_since_last_flush > tombstone_info.max)
|
||||
{
|
||||
tombstone_info.num_since_last_flush = 0;
|
||||
flush_table (table_a);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::rocksdb_store::flush_table (nano::tables table_a)
|
||||
{
|
||||
db->Flush (rocksdb::FlushOptions{}, table_to_column_family (table_a));
|
||||
}
|
||||
|
||||
void nano::rocksdb_store::version_put (nano::write_transaction const & transaction_a, int version_a)
|
||||
{
|
||||
debug_assert (transaction_a.contains (tables::meta));
|
||||
|
@ -369,7 +436,7 @@ void nano::rocksdb_store::construct_column_family_mutexes ()
|
|||
}
|
||||
}
|
||||
|
||||
rocksdb::Options nano::rocksdb_store::get_db_options () const
|
||||
rocksdb::Options nano::rocksdb_store::get_db_options ()
|
||||
{
|
||||
rocksdb::Options db_options;
|
||||
db_options.create_if_missing = true;
|
||||
|
@ -388,6 +455,9 @@ rocksdb::Options nano::rocksdb_store::get_db_options () const
|
|||
// Adds a separate write queue for memtable/WAL
|
||||
db_options.enable_pipelined_write = true;
|
||||
|
||||
auto event_listener_l = new event_listener ([this](rocksdb::FlushJobInfo const & flush_job_info_a) { this->on_flush (flush_job_info_a); });
|
||||
db_options.listeners.emplace_back (event_listener_l);
|
||||
|
||||
return db_options;
|
||||
}
|
||||
|
||||
|
@ -443,6 +513,15 @@ rocksdb::ColumnFamilyOptions nano::rocksdb_store::get_cf_options () const
|
|||
return cf_options;
|
||||
}
|
||||
|
||||
void nano::rocksdb_store::on_flush (rocksdb::FlushJobInfo const & flush_job_info_a)
|
||||
{
|
||||
// Reset appropriate tombstone counters
|
||||
if (auto it = tombstone_map.find (cf_name_table_map[flush_job_info_a.cf_name.c_str ()]); it != tombstone_map.end ())
|
||||
{
|
||||
it->second.num_since_last_flush = 0;
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<nano::tables> nano::rocksdb_store::all_tables () const
|
||||
{
|
||||
return std::vector<nano::tables>{ tables::accounts, tables::blocks, tables::confirmation_height, tables::frontiers, tables::meta, tables::online_weight, tables::peers, tables::pending, tables::unchecked, tables::vote };
|
||||
|
@ -575,5 +654,11 @@ void nano::rocksdb_store::serialize_memory_stats (boost::property_tree::ptree &
|
|||
json.put ("block-cache-usage", val);
|
||||
}
|
||||
|
||||
nano::rocksdb_store::tombstone_info::tombstone_info (uint64_t num_since_last_flush_a, uint64_t const max_a) :
|
||||
num_since_last_flush (num_since_last_flush_a),
|
||||
max (max_a)
|
||||
{
|
||||
}
|
||||
|
||||
// Explicitly instantiate
|
||||
template class nano::block_store_partial<rocksdb::Slice, nano::rocksdb_store>;
|
||||
|
|
|
@ -68,6 +68,18 @@ private:
|
|||
std::vector<std::unique_ptr<rocksdb::ColumnFamilyHandle>> handles;
|
||||
std::shared_ptr<rocksdb::TableFactory> table_factory;
|
||||
std::unordered_map<nano::tables, std::mutex> write_lock_mutexes;
|
||||
nano::rocksdb_config rocksdb_config;
|
||||
|
||||
class tombstone_info
|
||||
{
|
||||
public:
|
||||
tombstone_info (uint64_t, uint64_t const);
|
||||
std::atomic<uint64_t> num_since_last_flush;
|
||||
uint64_t const max;
|
||||
};
|
||||
|
||||
std::unordered_map<nano::tables, tombstone_info> tombstone_map;
|
||||
std::unordered_map<const char *, nano::tables> cf_name_table_map;
|
||||
|
||||
rocksdb::Transaction * tx (nano::transaction const & transaction_a) const;
|
||||
std::vector<nano::tables> all_tables () const;
|
||||
|
@ -84,12 +96,19 @@ private:
|
|||
|
||||
rocksdb::ColumnFamilyOptions get_cf_options () const;
|
||||
void construct_column_family_mutexes ();
|
||||
rocksdb::Options get_db_options () const;
|
||||
rocksdb::Options get_db_options ();
|
||||
rocksdb::BlockBasedTableOptions get_table_options () const;
|
||||
nano::rocksdb_config rocksdb_config;
|
||||
|
||||
void on_flush (rocksdb::FlushJobInfo const &);
|
||||
void flush_table (nano::tables table_a);
|
||||
void flush_tombstones_check (nano::tables table_a);
|
||||
void generate_tombstone_map ();
|
||||
std::unordered_map<const char *, nano::tables> create_cf_name_table_map () const;
|
||||
|
||||
constexpr static int base_memtable_size = 16;
|
||||
constexpr static int base_block_cache_size = 16;
|
||||
|
||||
friend class rocksdb_block_store_tombstone_count_Test;
|
||||
};
|
||||
|
||||
extern template class block_store_partial<rocksdb::Slice, rocksdb_store>;
|
||||
|
|
|
@ -475,13 +475,13 @@ TEST (history, short_text)
|
|||
account = system.account (transaction, 0);
|
||||
}
|
||||
auto wallet (std::make_shared<nano_qt::wallet> (*test_application, processor, *system.nodes[0], system.wallet (0), account));
|
||||
nano::mdb_store store (system.nodes[0]->logger, nano::unique_path ());
|
||||
ASSERT_TRUE (!store.init_error ());
|
||||
auto store = nano::make_store (system.nodes[0]->logger, nano::unique_path ());
|
||||
ASSERT_TRUE (!store->init_error ());
|
||||
nano::genesis genesis;
|
||||
nano::ledger ledger (store, system.nodes[0]->stats);
|
||||
nano::ledger ledger (*store, system.nodes[0]->stats);
|
||||
{
|
||||
auto transaction (store.tx_begin_write ());
|
||||
store.initialize (transaction, genesis, ledger.cache);
|
||||
auto transaction (store->tx_begin_write ());
|
||||
store->initialize (transaction, genesis, ledger.cache);
|
||||
nano::keypair key;
|
||||
auto latest (ledger.latest (transaction, nano::dev_genesis_key.pub));
|
||||
nano::send_block send (latest, nano::dev_genesis_key.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (latest));
|
||||
|
|
|
@ -504,6 +504,7 @@ enum class tables
|
|||
accounts,
|
||||
blocks,
|
||||
confirmation_height,
|
||||
default_unused, // RocksDB only
|
||||
frontiers,
|
||||
meta,
|
||||
online_weight,
|
||||
|
|
|
@ -916,11 +916,11 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
|
|||
{
|
||||
nano::logger_mt logger;
|
||||
auto path (nano::unique_path ());
|
||||
nano::mdb_store store (logger, path);
|
||||
ASSERT_TRUE (!store.init_error ());
|
||||
auto store = nano::make_store (logger, path);
|
||||
ASSERT_TRUE (!store->init_error ());
|
||||
nano::genesis genesis;
|
||||
nano::stat stats;
|
||||
nano::ledger ledger (store, stats);
|
||||
nano::ledger ledger (*store, stats);
|
||||
nano::write_database_queue write_database_queue (false);
|
||||
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
|
||||
std::atomic<bool> stopped{ false };
|
||||
|
@ -938,8 +938,8 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
|
|||
nano::system system;
|
||||
|
||||
{
|
||||
auto transaction (store.tx_begin_write ());
|
||||
store.initialize (transaction, genesis, ledger.cache);
|
||||
auto transaction (store->tx_begin_write ());
|
||||
store->initialize (transaction, genesis, ledger.cache);
|
||||
|
||||
// Send from genesis account to all other accounts and create open block for them
|
||||
for (auto i = 0; i < num_accounts; ++i)
|
||||
|
@ -972,7 +972,7 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
|
|||
|
||||
// Now add all send/receive blocks
|
||||
{
|
||||
auto transaction (store.tx_begin_write ());
|
||||
auto transaction (store->tx_begin_write ());
|
||||
for (int i = 0; i < open_blocks.size (); ++i)
|
||||
{
|
||||
auto open_block = open_blocks[i];
|
||||
|
@ -1011,9 +1011,9 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
|
|||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
auto transaction = store.tx_begin_read ();
|
||||
auto transaction = store->tx_begin_read ();
|
||||
auto cemented_count = 0;
|
||||
for (auto i (ledger.store.confirmation_height_begin (transaction)), n (ledger.store.confirmation_height_end ()); i != n; ++i)
|
||||
for (auto i (store->confirmation_height_begin (transaction)), n (store->confirmation_height_end ()); i != n; ++i)
|
||||
{
|
||||
cemented_count += i->second.height;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue