Moving write_database_queue in to nano::store namespace/library.

This commit is contained in:
Colin LeMahieu 2024-04-04 12:25:43 +02:00
commit 1fda9d2976
No known key found for this signature in database
GPG key ID: 43708520C8DFB938
14 changed files with 51 additions and 48 deletions

View file

@ -18,14 +18,14 @@ using namespace std::chrono_literals;
TEST (confirming_set, construction) TEST (confirming_set, construction)
{ {
auto ctx = nano::test::context::ledger_empty (); auto ctx = nano::test::context::ledger_empty ();
nano::write_database_queue write_queue{ false }; nano::store::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue); nano::confirming_set confirming_set (ctx.ledger (), write_queue);
} }
TEST (confirming_set, add_exists) TEST (confirming_set, add_exists)
{ {
auto ctx = nano::test::context::ledger_send_receive (); auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false }; nano::store::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue); nano::confirming_set confirming_set (ctx.ledger (), write_queue);
auto send = ctx.blocks ()[0]; auto send = ctx.blocks ()[0];
confirming_set.add (send->hash ()); confirming_set.add (send->hash ());
@ -35,7 +35,7 @@ TEST (confirming_set, add_exists)
TEST (confirming_set, process_one) TEST (confirming_set, process_one)
{ {
auto ctx = nano::test::context::ledger_send_receive (); auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false }; nano::store::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue); nano::confirming_set confirming_set (ctx.ledger (), write_queue);
std::atomic<int> count = 0; std::atomic<int> count = 0;
std::mutex mutex; std::mutex mutex;
@ -52,7 +52,7 @@ TEST (confirming_set, process_one)
TEST (confirming_set, process_multiple) TEST (confirming_set, process_multiple)
{ {
auto ctx = nano::test::context::ledger_send_receive (); auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false }; nano::store::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue); nano::confirming_set confirming_set (ctx.ledger (), write_queue);
std::atomic<int> count = 0; std::atomic<int> count = 0;
std::mutex mutex; std::mutex mutex;
@ -155,7 +155,7 @@ TEST (confirmation_callback, confirmed_history)
ASSERT_TIMELY (5s, election = nano::test::start_election (system, *node, send1->hash ())); ASSERT_TIMELY (5s, election = nano::test::start_election (system, *node, send1->hash ()));
{ {
// The write guard prevents the confirmation height processor doing any writes // The write guard prevents the confirmation height processor doing any writes
auto write_guard = node->write_database_queue.wait (nano::writer::testing); auto write_guard = node->write_database_queue.wait (nano::store::writer::testing);
// Confirm send1 // Confirm send1
election->force_confirm (); election->force_confirm ();
@ -166,13 +166,13 @@ TEST (confirmation_callback, confirmed_history)
auto transaction = node->store.tx_begin_read (); auto transaction = node->store.tx_begin_read ();
ASSERT_FALSE (node->ledger.block_confirmed (transaction, send->hash ())); ASSERT_FALSE (node->ledger.block_confirmed (transaction, send->hash ()));
ASSERT_TIMELY (10s, node->write_database_queue.contains (nano::writer::confirmation_height)); ASSERT_TIMELY (10s, node->write_database_queue.contains (nano::store::writer::confirmation_height));
// Confirm that no inactive callbacks have been called when the confirmation height processor has already iterated over it, waiting to write // Confirm that no inactive callbacks have been called when the confirmation height processor has already iterated over it, waiting to write
ASSERT_EQ (0, node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::inactive_conf_height, nano::stat::dir::out)); ASSERT_EQ (0, node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::inactive_conf_height, nano::stat::dir::out));
} }
ASSERT_TIMELY (10s, !node->write_database_queue.contains (nano::writer::confirmation_height)); ASSERT_TIMELY (10s, !node->write_database_queue.contains (nano::store::writer::confirmation_height));
auto transaction = node->store.tx_begin_read (); auto transaction = node->store.tx_begin_read ();
ASSERT_TRUE (node->ledger.block_confirmed (transaction, send->hash ())); ASSERT_TRUE (node->ledger.block_confirmed (transaction, send->hash ()));

View file

@ -784,7 +784,7 @@ TEST (ledger_confirm, pruned_source)
nano::stats stats; nano::stats stats;
nano::ledger ledger (*store, stats, nano::dev::constants); nano::ledger ledger (*store, stats, nano::dev::constants);
ledger.pruning = true; ledger.pruning = true;
nano::write_database_queue write_database_queue (false); nano::store::write_database_queue write_database_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () }; nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::keypair key1, key2; nano::keypair key1, key2;
nano::block_builder builder; nano::block_builder builder;
@ -868,7 +868,7 @@ TEST (ledger_confirmDeathTest, rollback_added_block)
ASSERT_TRUE (!store->init_error ()); ASSERT_TRUE (!store->init_error ());
nano::stats stats; nano::stats stats;
nano::ledger ledger (*store, stats, nano::dev::constants); nano::ledger ledger (*store, stats, nano::dev::constants);
nano::write_database_queue write_database_queue (false); nano::store::write_database_queue write_database_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () }; nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::keypair key1; nano::keypair key1;
nano::block_builder builder; nano::block_builder builder;

View file

@ -2740,7 +2740,7 @@ TEST (node, block_processor_half_full)
.work (*node.work_generate_blocking (send2->hash ())) .work (*node.work_generate_blocking (send2->hash ()))
.build (); .build ();
// The write guard prevents block processor doing any writes // The write guard prevents block processor doing any writes
auto write_guard = node.write_database_queue.wait (nano::writer::testing); auto write_guard = node.write_database_queue.wait (nano::store::writer::testing);
node.block_processor.add (send1); node.block_processor.add (send1);
ASSERT_FALSE (node.block_processor.half_full ()); ASSERT_FALSE (node.block_processor.half_full ());
node.block_processor.add (send2); node.block_processor.add (send2);
@ -3097,7 +3097,7 @@ TEST (node, rollback_vote_self)
{ {
// The write guard prevents the block processor from performing the rollback // The write guard prevents the block processor from performing the rollback
auto write_guard = node.write_database_queue.wait (nano::writer::testing); auto write_guard = node.write_database_queue.wait (nano::store::writer::testing);
ASSERT_EQ (0, election->votes_with_weight ().size ()); ASSERT_EQ (0, election->votes_with_weight ().size ());
// Vote with key to switch the winner // Vote with key to switch the winner

View file

@ -180,8 +180,6 @@ add_library(
websocketconfig.cpp websocketconfig.cpp
websocket_stream.hpp websocket_stream.hpp
websocket_stream.cpp websocket_stream.cpp
write_database_queue.hpp
write_database_queue.cpp
messages.hpp messages.hpp
messages.cpp messages.cpp
xorshift.hpp) xorshift.hpp)

View file

@ -37,7 +37,7 @@ void nano::block_processor::context::set_result (result_t const & result)
* block_processor * block_processor
*/ */
nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) : nano::block_processor::block_processor (nano::node & node_a, nano::store::write_database_queue & write_database_queue_a) :
config{ node_a.config.block_processor }, config{ node_a.config.block_processor },
node (node_a), node (node_a),
write_database_queue (write_database_queue_a), write_database_queue (write_database_queue_a),
@ -300,7 +300,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
{ {
processed_batch_t processed; processed_batch_t processed;
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); auto scoped_write_guard = write_database_queue.wait (nano::store::writer::process_batch);
auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights })); auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::pending, tables::rep_weights }));
nano::timer<std::chrono::milliseconds> timer_l; nano::timer<std::chrono::milliseconds> timer_l;
@ -509,4 +509,4 @@ nano::error nano::block_processor_config::deserialize (nano::tomlconfig & toml)
toml.get ("priority_local", priority_local); toml.get ("priority_local", priority_local);
return toml.get_error (); return toml.get_error ();
} }

View file

@ -14,12 +14,12 @@ namespace nano
{ {
class block; class block;
class node; class node;
class write_database_queue;
} }
namespace nano::store namespace nano::store
{ {
class write_transaction; class write_transaction;
class write_database_queue;
} }
namespace nano namespace nano
@ -86,7 +86,7 @@ public: // Context
}; };
public: public:
block_processor (nano::node &, nano::write_database_queue &); block_processor (nano::node &, nano::store::write_database_queue &);
~block_processor (); ~block_processor ();
void start (); void start ();
@ -127,7 +127,7 @@ private:
private: // Dependencies private: // Dependencies
block_processor_config const & config; block_processor_config const & config;
nano::node & node; nano::node & node;
nano::write_database_queue & write_database_queue; nano::store::write_database_queue & write_database_queue;
private: private:
nano::fair_queue<context, block_source> queue; nano::fair_queue<context, block_source> queue;

View file

@ -1,10 +1,10 @@
#include <nano/lib/thread_roles.hpp> #include <nano/lib/thread_roles.hpp>
#include <nano/node/confirming_set.hpp> #include <nano/node/confirming_set.hpp>
#include <nano/node/write_database_queue.hpp>
#include <nano/secure/ledger.hpp> #include <nano/secure/ledger.hpp>
#include <nano/store/component.hpp> #include <nano/store/component.hpp>
#include <nano/store/write_database_queue.hpp>
nano::confirming_set::confirming_set (nano::ledger & ledger, nano::write_database_queue & write_queue, std::chrono::milliseconds batch_time) : nano::confirming_set::confirming_set (nano::ledger & ledger, nano::store::write_database_queue & write_queue, std::chrono::milliseconds batch_time) :
ledger{ ledger }, ledger{ ledger },
write_queue{ write_queue }, write_queue{ write_queue },
batch_time{ batch_time } batch_time{ batch_time }
@ -72,7 +72,7 @@ void nano::confirming_set::run ()
for (auto i = processing.begin (), n = processing.end (); !stopped && i != n;) for (auto i = processing.begin (), n = processing.end (); !stopped && i != n;)
{ {
lock.unlock (); // Waiting for db write is potentially slow lock.unlock (); // Waiting for db write is potentially slow
auto guard = write_queue.wait (nano::writer::confirmation_height); auto guard = write_queue.wait (nano::store::writer::confirmation_height);
auto tx = ledger.store.tx_begin_write ({ nano::tables::confirmation_height }); auto tx = ledger.store.tx_begin_write ({ nano::tables::confirmation_height });
lock.lock (); lock.lock ();
// Process items in the back buffer within a single transaction for a limited amount of time // Process items in the back buffer within a single transaction for a limited amount of time

View file

@ -13,6 +13,9 @@ namespace nano
{ {
class block; class block;
class ledger; class ledger;
}
namespace nano::store
{
class write_database_queue; class write_database_queue;
} }
@ -27,7 +30,7 @@ class confirming_set final
friend class confirmation_height_pruned_source_Test; friend class confirmation_height_pruned_source_Test;
public: public:
confirming_set (nano::ledger & ledger, nano::write_database_queue & write_queue, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 }); confirming_set (nano::ledger & ledger, nano::store::write_database_queue & write_queue, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 });
~confirming_set (); ~confirming_set ();
// Adds a block to the set of blocks to be confirmed // Adds a block to the set of blocks to be confirmed
void add (nano::block_hash const & hash); void add (nano::block_hash const & hash);
@ -45,7 +48,7 @@ public:
private: private:
void run (); void run ();
nano::ledger & ledger; nano::ledger & ledger;
nano::write_database_queue & write_queue; nano::store::write_database_queue & write_queue;
std::chrono::milliseconds batch_time; std::chrono::milliseconds batch_time;
std::unordered_set<nano::block_hash> set; std::unordered_set<nano::block_hash> set;
std::unordered_set<nano::block_hash> processing; std::unordered_set<nano::block_hash> processing;

View file

@ -1005,7 +1005,7 @@ void nano::node::ledger_pruning (uint64_t const batch_size_a, bool bootstrap_wei
transaction_write_count = 0; transaction_write_count = 0;
if (!pruning_targets.empty () && !stopped) if (!pruning_targets.empty () && !stopped)
{ {
auto scoped_write_guard = write_database_queue.wait (nano::writer::pruning); auto scoped_write_guard = write_database_queue.wait (nano::store::writer::pruning);
auto write_transaction (store.tx_begin_write ({ tables::blocks, tables::pruned })); auto write_transaction (store.tx_begin_write ({ tables::blocks, tables::pruned }));
while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped) while (!pruning_targets.empty () && transaction_write_count < batch_size_a && !stopped)
{ {

View file

@ -32,8 +32,8 @@
#include <nano/node/vote_processor.hpp> #include <nano/node/vote_processor.hpp>
#include <nano/node/wallet.hpp> #include <nano/node/wallet.hpp>
#include <nano/node/websocket.hpp> #include <nano/node/websocket.hpp>
#include <nano/node/write_database_queue.hpp>
#include <nano/secure/utility.hpp> #include <nano/secure/utility.hpp>
#include <nano/store/write_database_queue.hpp>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <boost/thread/latch.hpp> #include <boost/thread/latch.hpp>
@ -138,7 +138,7 @@ public:
public: public:
const nano::keypair node_id; const nano::keypair node_id;
nano::write_database_queue write_database_queue; nano::store::write_database_queue write_database_queue;
std::shared_ptr<boost::asio::io_context> io_ctx_shared; std::shared_ptr<boost::asio::io_context> io_ctx_shared;
boost::asio::io_context & io_ctx; boost::asio::io_context & io_ctx;
boost::latch node_initialized_latch; boost::latch node_initialized_latch;

View file

@ -1136,13 +1136,13 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
ASSERT_TRUE (!store->init_error ()); ASSERT_TRUE (!store->init_error ());
nano::stats stats; nano::stats stats;
nano::ledger ledger (*store, stats, nano::dev::constants); nano::ledger ledger (*store, stats, nano::dev::constants);
nano::write_database_queue write_database_queue (false); nano::store::write_database_queue write_database_queue (false);
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () }; nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
std::atomic<bool> stopped{ false }; std::atomic<bool> stopped{ false };
boost::latch initialized_latch{ 0 }; boost::latch initialized_latch{ 0 };
nano::block_hash block_hash_being_processed{ 0 }; nano::block_hash block_hash_being_processed{ 0 };
nano::write_database_queue write_queue{ false }; nano::store::write_database_queue write_queue{ false };
nano::confirming_set confirming_set{ ledger, write_queue }; nano::confirming_set confirming_set{ ledger, write_queue };
auto const num_accounts = 100000; auto const num_accounts = 100000;

View file

@ -92,7 +92,9 @@ add_library(
rocksdb/version.cpp rocksdb/version.cpp
transaction.cpp transaction.cpp
version.cpp version.cpp
versioning.cpp) versioning.cpp
write_database_queue.hpp
write_database_queue.cpp)
target_link_libraries( target_link_libraries(
nano_store nano_store

View file

@ -1,15 +1,15 @@
#include <nano/lib/config.hpp> #include <nano/lib/config.hpp>
#include <nano/lib/utility.hpp> #include <nano/lib/utility.hpp>
#include <nano/node/write_database_queue.hpp> #include <nano/store/write_database_queue.hpp>
#include <algorithm> #include <algorithm>
nano::write_guard::write_guard (std::function<void ()> guard_finish_callback_a) : nano::store::write_guard::write_guard (std::function<void ()> guard_finish_callback_a) :
guard_finish_callback (guard_finish_callback_a) guard_finish_callback (guard_finish_callback_a)
{ {
} }
nano::write_guard::write_guard (nano::write_guard && write_guard_a) noexcept : nano::store::write_guard::write_guard (write_guard && write_guard_a) noexcept :
guard_finish_callback (std::move (write_guard_a.guard_finish_callback)), guard_finish_callback (std::move (write_guard_a.guard_finish_callback)),
owns (write_guard_a.owns) owns (write_guard_a.owns)
{ {
@ -17,7 +17,7 @@ nano::write_guard::write_guard (nano::write_guard && write_guard_a) noexcept :
write_guard_a.guard_finish_callback = nullptr; write_guard_a.guard_finish_callback = nullptr;
} }
nano::write_guard & nano::write_guard::operator= (nano::write_guard && write_guard_a) noexcept nano::store::write_guard & nano::store::write_guard::operator= (write_guard && write_guard_a) noexcept
{ {
owns = write_guard_a.owns; owns = write_guard_a.owns;
guard_finish_callback = std::move (write_guard_a.guard_finish_callback); guard_finish_callback = std::move (write_guard_a.guard_finish_callback);
@ -27,7 +27,7 @@ nano::write_guard & nano::write_guard::operator= (nano::write_guard && write_gua
return *this; return *this;
} }
nano::write_guard::~write_guard () nano::store::write_guard::~write_guard ()
{ {
if (owns) if (owns)
{ {
@ -35,12 +35,12 @@ nano::write_guard::~write_guard ()
} }
} }
bool nano::write_guard::is_owned () const bool nano::store::write_guard::is_owned () const
{ {
return owns; return owns;
} }
void nano::write_guard::release () void nano::store::write_guard::release ()
{ {
debug_assert (owns); debug_assert (owns);
if (owns) if (owns)
@ -50,7 +50,7 @@ void nano::write_guard::release ()
owns = false; owns = false;
} }
nano::write_database_queue::write_database_queue (bool use_noops_a) : nano::store::write_database_queue::write_database_queue (bool use_noops_a) :
guard_finish_callback ([use_noops_a, &queue = queue, &mutex = mutex, &cv = cv] () { guard_finish_callback ([use_noops_a, &queue = queue, &mutex = mutex, &cv = cv] () {
if (!use_noops_a) if (!use_noops_a)
{ {
@ -65,7 +65,7 @@ nano::write_database_queue::write_database_queue (bool use_noops_a) :
{ {
} }
nano::write_guard nano::write_database_queue::wait (nano::writer writer) nano::store::write_guard nano::store::write_database_queue::wait (writer writer)
{ {
if (use_noops) if (use_noops)
{ {
@ -88,14 +88,14 @@ nano::write_guard nano::write_database_queue::wait (nano::writer writer)
return write_guard (guard_finish_callback); return write_guard (guard_finish_callback);
} }
bool nano::write_database_queue::contains (nano::writer writer) bool nano::store::write_database_queue::contains (writer writer)
{ {
debug_assert (!use_noops); debug_assert (!use_noops);
nano::lock_guard<nano::mutex> guard (mutex); nano::lock_guard<nano::mutex> guard (mutex);
return std::find (queue.cbegin (), queue.cend (), writer) != queue.cend (); return std::find (queue.cbegin (), queue.cend (), writer) != queue.cend ();
} }
bool nano::write_database_queue::process (nano::writer writer) bool nano::store::write_database_queue::process (writer writer)
{ {
if (use_noops) if (use_noops)
{ {
@ -123,7 +123,7 @@ bool nano::write_database_queue::process (nano::writer writer)
return result; return result;
} }
nano::write_guard nano::write_database_queue::pop () nano::store::write_guard nano::store::write_database_queue::pop ()
{ {
return write_guard (guard_finish_callback); return write_guard (guard_finish_callback);
} }

View file

@ -6,7 +6,7 @@
#include <deque> #include <deque>
#include <functional> #include <functional>
namespace nano namespace nano::store
{ {
/** Distinct areas write locking is done, order is irrelevant */ /** Distinct areas write locking is done, order is irrelevant */
enum class writer enum class writer
@ -43,22 +43,22 @@ class write_database_queue final
public: public:
write_database_queue (bool use_noops_a); write_database_queue (bool use_noops_a);
/** Blocks until we are at the head of the queue and blocks other waiters until write_guard goes out of scope */ /** Blocks until we are at the head of the queue and blocks other waiters until write_guard goes out of scope */
[[nodiscard ("write_guard blocks other waiters")]] write_guard wait (nano::writer writer); [[nodiscard ("write_guard blocks other waiters")]] write_guard wait (writer writer);
/** Returns true if this writer is now at the front of the queue */ /** Returns true if this writer is now at the front of the queue */
bool process (nano::writer writer); bool process (writer writer);
/** Returns true if this writer is anywhere in the queue. Currently only used in tests */ /** Returns true if this writer is anywhere in the queue. Currently only used in tests */
bool contains (nano::writer writer); bool contains (writer writer);
/** Doesn't actually pop anything until the returned write_guard is out of scope */ /** Doesn't actually pop anything until the returned write_guard is out of scope */
write_guard pop (); write_guard pop ();
private: private:
std::deque<nano::writer> queue; std::deque<writer> queue;
nano::mutex mutex; nano::mutex mutex;
nano::condition_variable cv; nano::condition_variable cv;
std::function<void ()> guard_finish_callback; std::function<void ()> guard_finish_callback;
bool use_noops; bool use_noops;
}; };
} } // namespace nano::store