Add confirming_set class which will replace confirmation_height_processor

This commit is contained in:
Colin LeMahieu 2024-03-22 18:06:27 +00:00
commit ae31275d0d
No known key found for this signature in database
GPG key ID: 43708520C8DFB938
5 changed files with 248 additions and 0 deletions

View file

@ -14,6 +14,7 @@ add_executable(
cli.cpp
confirmation_height.cpp
confirmation_solicitor.cpp
confirming_set.cpp
conflicts.cpp
difficulty.cpp
distributed_work.cpp

View file

@ -0,0 +1,68 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/make_store.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/test_common/ledger.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
#include <latch>
using namespace std::chrono_literals;
TEST (confirming_set, construction)
{
auto ctx = nano::test::context::ledger_empty ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
}
TEST (confirming_set, add_exists)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
auto send = ctx.blocks ()[0];
confirming_set.add (send->hash ());
ASSERT_TRUE (confirming_set.exists (send->hash ()));
}
TEST (confirming_set, process_one)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); });
confirming_set.add (ctx.blocks ()[0]->hash ());
nano::test::start_stop_guard guard{ confirming_set };
std::unique_lock lock{ mutex };
ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 1; }));
ASSERT_EQ (1, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (2, ctx.ledger ().cache.cemented_count);
}
TEST (confirming_set, process_multiple)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::write_database_queue write_queue{ false };
nano::confirming_set confirming_set (ctx.ledger (), write_queue);
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); });
confirming_set.add (ctx.blocks ()[0]->hash ());
confirming_set.add (ctx.blocks ()[1]->hash ());
nano::test::start_stop_guard guard{ confirming_set };
std::unique_lock lock{ mutex };
ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 2; }));
ASSERT_EQ (2, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (3, ctx.ledger ().cache.cemented_count);
}

View file

@ -59,6 +59,8 @@ add_library(
cli.cpp
common.hpp
common.cpp
confirming_set.hpp
confirming_set.cpp
confirmation_height_bounded.hpp
confirmation_height_bounded.cpp
confirmation_height_processor.hpp

View file

@ -0,0 +1,120 @@
#include <nano/lib/thread_roles.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/write_database_queue.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/store/component.hpp>
nano::confirming_set::confirming_set (nano::ledger & ledger, nano::write_database_queue & write_queue, std::chrono::milliseconds batch_time) :
ledger{ ledger },
write_queue{ write_queue },
batch_time{ batch_time }
{
}
nano::confirming_set::~confirming_set ()
{
debug_assert (!thread.joinable ());
}
void nano::confirming_set::add (nano::block_hash const & hash)
{
std::lock_guard lock{ mutex };
set.insert (hash);
condition.notify_all ();
}
void nano::confirming_set::start ()
{
thread = std::thread{ [this] () { run (); } };
}
void nano::confirming_set::stop ()
{
{
std::lock_guard lock{ mutex };
stopped = true;
condition.notify_all ();
}
if (thread.joinable ())
{
thread.join ();
}
}
bool nano::confirming_set::exists (nano::block_hash const & hash) const
{
std::lock_guard lock{ mutex };
return set.count (hash) != 0 || processing.count (hash) != 0;
}
std::size_t nano::confirming_set::size () const
{
std::lock_guard lock{ mutex };
return set.size () + processing.size ();
}
void nano::confirming_set::run ()
{
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
std::unique_lock lock{ mutex };
// Run the confirmation loop until stopped
while (!stopped)
{
condition.wait (lock, [&] () { return !set.empty () || stopped; });
// Loop if there are items to process
if (!stopped && !set.empty ())
{
std::deque<std::shared_ptr<nano::block>> cemented;
std::deque<nano::block_hash> already;
// Move items in to back buffer and release lock so more items can be added to the front buffer
processing = std::move (this->set);
// Process all items in the back buffer
for (auto i = processing.begin (), n = processing.end (); !stopped && i != n;)
{
lock.unlock (); // Waiting for db write is potentially slow
auto guard = write_queue.wait (nano::writer::confirmation_height);
auto tx = ledger.store.tx_begin_write ({ nano::tables::confirmation_height });
lock.lock ();
// Process items in the back buffer within a single transaction for a limited amount of time
for (auto timeout = std::chrono::steady_clock::now () + batch_time; !stopped && std::chrono::steady_clock::now () < timeout && i != n; ++i)
{
auto item = *i;
lock.unlock ();
auto added = ledger.confirm (tx, item);
if (!added.empty ())
{
// Confirming this block may implicitly confirm more
cemented.insert (cemented.end (), added.begin (), added.end ());
}
else
{
already.push_back (item);
}
lock.lock ();
}
}
lock.unlock ();
for (auto const & i : cemented)
{
cemented_observers.notify (i);
}
for (auto const & i : already)
{
block_already_cemented_observers.notify (i);
}
lock.lock ();
// Clear and free back buffer by re-initializing
processing = decltype (processing){};
}
}
}
std::unique_ptr<nano::container_info_component> nano::confirming_set::collect_container_info (std::string const & name) const
{
std::lock_guard guard{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "set", set.size (), sizeof (typename decltype (set)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "processing", processing.size (), sizeof (typename decltype (processing)::value_type) }));
return composite;
}

View file

@ -0,0 +1,57 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <nano/lib/observer_set.hpp>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <thread>
#include <unordered_set>
namespace nano
{
class block;
class ledger;
class write_database_queue;
}
namespace nano
{
/**
* Set of blocks to be durably confirmed
*/
class confirming_set final
{
friend class confirmation_heightDeathTest_missing_block_Test;
friend class confirmation_height_pruned_source_Test;
public:
confirming_set (nano::ledger & ledger, nano::write_database_queue & write_queue, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 });
~confirming_set ();
// Adds a block to the set of blocks to be confirmed
void add (nano::block_hash const & hash);
void start ();
void stop ();
// Added blocks will remain in this set until after ledger has them marked as confirmed.
bool exists (nano::block_hash const & hash) const;
std::size_t size () const;
std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const;
// Observers will be called once ledger has blocks marked as confirmed
nano::observer_set<std::shared_ptr<nano::block>> cemented_observers;
nano::observer_set<nano::block_hash const &> block_already_cemented_observers;
private:
void run ();
nano::ledger & ledger;
nano::write_database_queue & write_queue;
std::chrono::milliseconds batch_time;
std::unordered_set<nano::block_hash> set;
std::unordered_set<nano::block_hash> processing;
bool stopped{ false };
mutable std::mutex mutex;
std::condition_variable condition;
std::thread thread;
};
}