Merge pull request #4780 from pwojcikdev/fix-write-guard
Fix database write guard
This commit is contained in:
commit
2fef78394b
4 changed files with 60 additions and 18 deletions
|
|
@ -5861,3 +5861,38 @@ TEST (ledger_transaction, write_wait_order)
|
|||
// Signal to continue and drop the third transaction
|
||||
latch3.count_down ();
|
||||
}
|
||||
|
||||
TEST (ledger_transaction, multithreaded_interleaving)
|
||||
{
|
||||
nano::test::system system;
|
||||
|
||||
auto ctx = nano::test::ledger_empty ();
|
||||
|
||||
int constexpr num_threads = 2;
|
||||
int constexpr num_iterations = 10;
|
||||
int constexpr num_blocks = 10;
|
||||
|
||||
std::deque<std::thread> threads;
|
||||
for (int i = 0; i < num_threads; ++i)
|
||||
{
|
||||
threads.emplace_back ([&] {
|
||||
for (int n = 0; n < num_iterations; ++n)
|
||||
{
|
||||
auto tx = ctx.ledger ().tx_begin_write (nano::store::writer::testing);
|
||||
for (unsigned k = 0; k < num_blocks; ++k)
|
||||
{
|
||||
ctx.store ().account.put (tx, nano::account{ k }, nano::account_info{});
|
||||
}
|
||||
for (unsigned k = 0; k < num_blocks; ++k)
|
||||
{
|
||||
ctx.store ().account.del (tx, nano::account{ k });
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
thread.join ();
|
||||
}
|
||||
}
|
||||
|
|
@ -29,17 +29,18 @@ public:
|
|||
virtual operator const nano::store::transaction & () const = 0;
|
||||
};
|
||||
|
||||
class write_transaction : public transaction
|
||||
class write_transaction final : public transaction
|
||||
{
|
||||
nano::store::write_guard guard; // Guard should be released after the transaction
|
||||
nano::store::write_transaction txn;
|
||||
nano::store::write_guard guard;
|
||||
std::chrono::steady_clock::time_point start;
|
||||
|
||||
public:
|
||||
explicit write_transaction (nano::store::write_transaction && txn, nano::store::write_guard && guard) noexcept :
|
||||
txn{ std::move (txn) },
|
||||
guard{ std::move (guard) }
|
||||
explicit write_transaction (nano::store::write_transaction && txn_a, nano::store::write_guard && guard_a) noexcept :
|
||||
guard{ std::move (guard_a) },
|
||||
txn{ std::move (txn_a) }
|
||||
{
|
||||
debug_assert (guard.is_owned ());
|
||||
start = std::chrono::steady_clock::now ();
|
||||
}
|
||||
|
||||
|
|
@ -97,7 +98,7 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
class read_transaction : public transaction
|
||||
class read_transaction final : public transaction
|
||||
{
|
||||
nano::store::read_transaction txn;
|
||||
|
||||
|
|
@ -140,4 +141,4 @@ public:
|
|||
return txn;
|
||||
}
|
||||
};
|
||||
} // namespace nano::secure
|
||||
}
|
||||
|
|
|
|||
|
|
@ -66,7 +66,9 @@ nano::store::write_guard nano::store::write_queue::wait (writer writer)
|
|||
bool nano::store::write_queue::contains (writer writer) const
|
||||
{
|
||||
nano::lock_guard<nano::mutex> guard{ mutex };
|
||||
return std::find (queue.cbegin (), queue.cend (), writer) != queue.cend ();
|
||||
return std::any_of (queue.cbegin (), queue.cend (), [writer] (auto const & item) {
|
||||
return item.first == writer;
|
||||
});
|
||||
}
|
||||
|
||||
void nano::store::write_queue::pop ()
|
||||
|
|
@ -83,17 +85,19 @@ void nano::store::write_queue::acquire (writer writer)
|
|||
{
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
|
||||
// There should be no duplicates in the queue
|
||||
debug_assert (std::none_of (queue.cbegin (), queue.cend (), [writer] (auto const & item) { return item == writer; }));
|
||||
// There should be no duplicates in the queue (exception is testing)
|
||||
debug_assert (std::none_of (queue.cbegin (), queue.cend (), [writer] (auto const & item) {
|
||||
return item.first == writer;
|
||||
})
|
||||
|| writer == writer::testing);
|
||||
|
||||
auto const id = next++;
|
||||
|
||||
// Add writer to the end of the queue if it's not already waiting
|
||||
auto exists = std::find (queue.cbegin (), queue.cend (), writer) != queue.cend ();
|
||||
if (!exists)
|
||||
{
|
||||
queue.push_back (writer);
|
||||
}
|
||||
queue.push_back ({ writer, id });
|
||||
|
||||
condition.wait (lock, [&] () { return queue.front () == writer; });
|
||||
// Wait until we are at the front of the queue
|
||||
condition.wait (lock, [&] () { return queue.front ().second == id; });
|
||||
}
|
||||
|
||||
void nano::store::write_queue::release (writer writer)
|
||||
|
|
@ -101,7 +105,7 @@ void nano::store::write_queue::release (writer writer)
|
|||
{
|
||||
nano::lock_guard<nano::mutex> guard{ mutex };
|
||||
release_assert (!queue.empty ());
|
||||
release_assert (queue.front () == writer);
|
||||
release_assert (queue.front ().first == writer);
|
||||
queue.pop_front ();
|
||||
}
|
||||
condition.notify_all ();
|
||||
|
|
|
|||
|
|
@ -70,7 +70,9 @@ private:
|
|||
void release (writer writer);
|
||||
|
||||
private:
|
||||
std::deque<writer> queue;
|
||||
uint64_t next{ 0 };
|
||||
using entry = std::pair<writer, uint64_t>; // uint64_t is a unique id for each write_guard
|
||||
std::deque<entry> queue;
|
||||
mutable nano::mutex mutex;
|
||||
nano::condition_variable condition;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue