Confirming set background

This commit is contained in:
Piotr Wójcik 2024-05-20 18:39:45 +02:00
commit ec8f324a3f
8 changed files with 130 additions and 60 deletions

View file

@ -19,13 +19,13 @@ using namespace std::chrono_literals;
TEST (confirming_set, construction)
{
auto ctx = nano::test::context::ledger_empty ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
}
TEST (confirming_set, add_exists)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
auto send = ctx.blocks ()[0];
confirming_set.add (send->hash ());
ASSERT_TRUE (confirming_set.exists (send->hash ()));
@ -34,7 +34,7 @@ TEST (confirming_set, add_exists)
TEST (confirming_set, process_one)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
@ -50,7 +50,7 @@ TEST (confirming_set, process_one)
TEST (confirming_set, process_multiple)
{
auto ctx = nano::test::context::ledger_send_receive ();
nano::confirming_set confirming_set (ctx.ledger ());
nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ());
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;

View file

@ -40,6 +40,7 @@ enum class type
socket,
confirmation_height,
confirmation_observer,
confirming_set,
drop,
aggregator,
requests,
@ -114,6 +115,8 @@ enum class detail
rebroadcast,
queue_overflow,
triggered,
notify,
duplicate,
// processing queue
queue,
@ -440,6 +443,9 @@ enum class detail
tier_2,
tier_3,
// confirming_set
confirmed,
_last // Must be the last enum
};

View file

@ -19,7 +19,7 @@ namespace nano
class thread_pool final
{
public:
explicit thread_pool (unsigned, nano::thread_role::name);
explicit thread_pool (unsigned num_threads, nano::thread_role::name);
~thread_pool ();
/** This will run when there is an available thread for execution */

View file

@ -61,6 +61,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::confirmation_height_processing:
thread_role_name_string = "Conf height";
break;
case nano::thread_role::name::confirmation_height_notifications:
thread_role_name_string = "Conf notif";
break;
case nano::thread_role::name::worker:
thread_role_name_string = "Worker";
break;

View file

@ -25,6 +25,7 @@ enum class name
rpc_request_processor,
rpc_process_container,
confirmation_height_processing,
confirmation_height_notifications,
worker,
bootstrap_worker,
request_aggregator,

View file

@ -1,3 +1,5 @@
#include "node.hpp"
#include <nano/lib/thread_roles.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/secure/ledger.hpp>
@ -5,9 +7,11 @@
#include <nano/store/component.hpp>
#include <nano/store/write_queue.hpp>
nano::confirming_set::confirming_set (nano::ledger & ledger, std::chrono::milliseconds batch_time) :
nano::confirming_set::confirming_set (nano::ledger & ledger, nano::stats & stats, std::chrono::milliseconds batch_time) :
ledger{ ledger },
batch_time{ batch_time }
stats{ stats },
batch_time{ batch_time },
workers{ 1, nano::thread_role::name::confirmation_height_notifications }
{
}
@ -18,14 +22,29 @@ nano::confirming_set::~confirming_set ()
void nano::confirming_set::add (nano::block_hash const & hash)
{
std::lock_guard lock{ mutex };
set.insert (hash);
condition.notify_all ();
bool added = false;
{
std::lock_guard lock{ mutex };
auto [it, inserted] = set.insert (hash);
added = inserted;
}
if (added)
{
condition.notify_all ();
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::insert);
}
else
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::duplicate);
}
}
void nano::confirming_set::start ()
{
thread = std::thread{ [this] () { run (); } };
thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
run ();
} };
}
void nano::confirming_set::stop ()
@ -39,6 +58,7 @@ void nano::confirming_set::stop ()
{
thread.join ();
}
workers.stop ();
}
bool nano::confirming_set::exists (nano::block_hash const & hash) const
@ -55,60 +75,88 @@ std::size_t nano::confirming_set::size () const
void nano::confirming_set::run ()
{
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
std::unique_lock lock{ mutex };
// Run the confirmation loop until stopped
while (!stopped)
{
condition.wait (lock, [&] () { return !set.empty () || stopped; });
// Loop if there are items to process
if (!stopped && !set.empty ())
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::loop);
if (!set.empty ())
{
std::deque<std::shared_ptr<nano::block>> cemented;
std::deque<nano::block_hash> already;
// Move items in to back buffer and release lock so more items can be added to the front buffer
processing = std::move (this->set);
// Process all items in the back buffer
for (auto i = processing.begin (), n = processing.end (); !stopped && i != n;)
{
lock.unlock (); // Waiting for db write is potentially slow
auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height);
auto tx = ledger.tx_begin_write ({ nano::tables::confirmation_height });
lock.lock ();
// Process items in the back buffer within a single transaction for a limited amount of time
for (auto timeout = std::chrono::steady_clock::now () + batch_time; !stopped && std::chrono::steady_clock::now () < timeout && i != n; ++i)
{
auto item = *i;
lock.unlock ();
auto added = ledger.confirm (tx, item);
if (!added.empty ())
{
// Confirming this block may implicitly confirm more
cemented.insert (cemented.end (), added.begin (), added.end ());
}
else
{
already.push_back (item);
}
lock.lock ();
}
}
lock.unlock ();
for (auto const & i : cemented)
{
cemented_observers.notify (i);
}
for (auto const & i : already)
{
block_already_cemented_observers.notify (i);
}
lock.lock ();
// Clear and free back buffer by re-initializing
processing = decltype (processing){};
run_batch (lock);
debug_assert (lock.owns_lock ());
}
else
{
condition.wait (lock, [&] () { return !set.empty () || stopped; });
}
}
}
void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!set.empty ());
std::deque<std::shared_ptr<nano::block>> cemented;
std::deque<nano::block_hash> already;
// Move items in to back buffer and release lock so more items can be added to the front buffer
release_assert (processing.empty ());
swap (set, processing);
// Process all items in the back buffer
for (auto i = processing.begin (), n = processing.end (); !stopped && i != n;)
{
lock.unlock (); // Waiting for db write is potentially slow
auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height);
auto tx = ledger.tx_begin_write ({ nano::tables::confirmation_height });
lock.lock ();
// Process items in the back buffer within a single transaction for a limited amount of time
for (auto timeout = std::chrono::steady_clock::now () + batch_time; !stopped && std::chrono::steady_clock::now () < timeout && i != n; ++i)
{
auto item = *i;
lock.unlock ();
auto added = ledger.confirm (tx, item);
if (!added.empty ())
{
// Confirming this block may implicitly confirm more
cemented.insert (cemented.end (), added.begin (), added.end ());
stats.add (nano::stat::type::confirming_set, nano::stat::detail::confirmed, added.size ());
}
else
{
already.push_back (item);
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::already_confirmed);
}
lock.lock ();
}
}
lock.unlock ();
workers.push_task ([this, cemented = std::move (cemented), already = std::move (already)] () {
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify);
for (auto const & i : cemented)
{
cemented_observers.notify (i);
}
for (auto const & i : already)
{
block_already_cemented_observers.notify (i);
}
});
lock.lock ();
processing.clear ();
}
std::unique_ptr<nano::container_info_component> nano::confirming_set::collect_container_info (std::string const & name) const
{
std::lock_guard guard{ mutex };
@ -116,5 +164,6 @@ std::unique_ptr<nano::container_info_component> nano::confirming_set::collect_co
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "set", set.size (), sizeof (typename decltype (set)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "processing", processing.size (), sizeof (typename decltype (processing)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "notifications", workers.num_queued_tasks (), sizeof (std::function<void ()>) }));
return composite;
}

View file

@ -2,6 +2,7 @@
#include <nano/lib/numbers.hpp>
#include <nano/lib/observer_set.hpp>
#include <nano/lib/thread_pool.hpp>
#include <condition_variable>
#include <deque>
@ -11,8 +12,10 @@
namespace nano
{
class node;
class block;
class ledger;
class stats;
}
namespace nano
@ -26,8 +29,9 @@ class confirming_set final
friend class confirmation_height_pruned_source_Test;
public:
confirming_set (nano::ledger & ledger, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 });
confirming_set (nano::ledger &, nano::stats &, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 });
~confirming_set ();
// Adds a block to the set of blocks to be confirmed
void add (nano::block_hash const & hash);
void start ();
@ -43,10 +47,17 @@ public:
private:
void run ();
void run_batch (std::unique_lock<std::mutex> &);
nano::ledger & ledger;
std::chrono::milliseconds batch_time;
nano::stats & stats;
std::chrono::milliseconds const batch_time;
std::unordered_set<nano::block_hash> set;
std::unordered_set<nano::block_hash> processing;
nano::thread_pool workers;
bool stopped{ false };
mutable std::mutex mutex;
std::condition_variable condition;

View file

@ -184,7 +184,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
application_path (application_path_a),
port_mapping (*this),
block_processor (*this),
confirming_set_impl{ std::make_unique<nano::confirming_set> (ledger, config.confirming_set_batch_time) },
confirming_set_impl{ std::make_unique<nano::confirming_set> (ledger, stats, config.confirming_set_batch_time) },
confirming_set{ *confirming_set_impl },
active_impl{ std::make_unique<nano::active_elections> (*this, confirming_set, block_processor) },
active{ *active_impl },