From ec8f324a3f144723eb2a3c5ba4c5b814a50b1363 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 20 May 2024 18:39:45 +0200 Subject: [PATCH] Confirming set background --- nano/core_test/confirming_set.cpp | 8 +- nano/lib/stats_enums.hpp | 6 ++ nano/lib/thread_pool.hpp | 2 +- nano/lib/thread_roles.cpp | 3 + nano/lib/thread_roles.hpp | 1 + nano/node/confirming_set.cpp | 153 ++++++++++++++++++++---------- nano/node/confirming_set.hpp | 15 ++- nano/node/node.cpp | 2 +- 8 files changed, 130 insertions(+), 60 deletions(-) diff --git a/nano/core_test/confirming_set.cpp b/nano/core_test/confirming_set.cpp index 7b0df16c0..f2fe62106 100644 --- a/nano/core_test/confirming_set.cpp +++ b/nano/core_test/confirming_set.cpp @@ -19,13 +19,13 @@ using namespace std::chrono_literals; TEST (confirming_set, construction) { auto ctx = nano::test::context::ledger_empty (); - nano::confirming_set confirming_set (ctx.ledger ()); + nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ()); } TEST (confirming_set, add_exists) { auto ctx = nano::test::context::ledger_send_receive (); - nano::confirming_set confirming_set (ctx.ledger ()); + nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ()); auto send = ctx.blocks ()[0]; confirming_set.add (send->hash ()); ASSERT_TRUE (confirming_set.exists (send->hash ())); @@ -34,7 +34,7 @@ TEST (confirming_set, add_exists) TEST (confirming_set, process_one) { auto ctx = nano::test::context::ledger_send_receive (); - nano::confirming_set confirming_set (ctx.ledger ()); + nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ()); std::atomic count = 0; std::mutex mutex; std::condition_variable condition; @@ -50,7 +50,7 @@ TEST (confirming_set, process_one) TEST (confirming_set, process_multiple) { auto ctx = nano::test::context::ledger_send_receive (); - nano::confirming_set confirming_set (ctx.ledger ()); + nano::confirming_set confirming_set (ctx.ledger (), ctx.stats ()); std::atomic count = 0; std::mutex mutex; std::condition_variable condition; diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 27eb16bce..ab491ab92 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -40,6 +40,7 @@ enum class type socket, confirmation_height, confirmation_observer, + confirming_set, drop, aggregator, requests, @@ -114,6 +115,8 @@ enum class detail rebroadcast, queue_overflow, triggered, + notify, + duplicate, // processing queue queue, @@ -440,6 +443,9 @@ enum class detail tier_2, tier_3, + // confirming_set + confirmed, + _last // Must be the last enum }; diff --git a/nano/lib/thread_pool.hpp b/nano/lib/thread_pool.hpp index f56a6271e..b9f3607ed 100644 --- a/nano/lib/thread_pool.hpp +++ b/nano/lib/thread_pool.hpp @@ -19,7 +19,7 @@ namespace nano class thread_pool final { public: - explicit thread_pool (unsigned, nano::thread_role::name); + explicit thread_pool (unsigned num_threads, nano::thread_role::name); ~thread_pool (); /** This will run when there is an available thread for execution */ diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 76211682e..98dc3b64e 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -61,6 +61,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::confirmation_height_processing: thread_role_name_string = "Conf height"; break; + case nano::thread_role::name::confirmation_height_notifications: + thread_role_name_string = "Conf notif"; + break; case nano::thread_role::name::worker: thread_role_name_string = "Worker"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index d6d78b694..b07026ab7 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -25,6 +25,7 @@ enum class name rpc_request_processor, rpc_process_container, confirmation_height_processing, + confirmation_height_notifications, worker, bootstrap_worker, request_aggregator, diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index d39cf5791..9aa101fe9 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -1,3 +1,5 @@ +#include "node.hpp" + #include #include #include @@ -5,9 +7,11 @@ #include #include -nano::confirming_set::confirming_set (nano::ledger & ledger, std::chrono::milliseconds batch_time) : +nano::confirming_set::confirming_set (nano::ledger & ledger, nano::stats & stats, std::chrono::milliseconds batch_time) : ledger{ ledger }, - batch_time{ batch_time } + stats{ stats }, + batch_time{ batch_time }, + workers{ 1, nano::thread_role::name::confirmation_height_notifications } { } @@ -18,14 +22,29 @@ nano::confirming_set::~confirming_set () void nano::confirming_set::add (nano::block_hash const & hash) { - std::lock_guard lock{ mutex }; - set.insert (hash); - condition.notify_all (); + bool added = false; + { + std::lock_guard lock{ mutex }; + auto [it, inserted] = set.insert (hash); + added = inserted; + } + if (added) + { + condition.notify_all (); + stats.inc (nano::stat::type::confirming_set, nano::stat::detail::insert); + } + else + { + stats.inc (nano::stat::type::confirming_set, nano::stat::detail::duplicate); + } } void nano::confirming_set::start () { - thread = std::thread{ [this] () { run (); } }; + thread = std::thread{ [this] () { + nano::thread_role::set (nano::thread_role::name::confirmation_height_processing); + run (); + } }; } void nano::confirming_set::stop () @@ -39,6 +58,7 @@ void nano::confirming_set::stop () { thread.join (); } + workers.stop (); } bool nano::confirming_set::exists (nano::block_hash const & hash) const @@ -55,60 +75,88 @@ std::size_t nano::confirming_set::size () const void nano::confirming_set::run () { - nano::thread_role::set (nano::thread_role::name::confirmation_height_processing); std::unique_lock lock{ mutex }; - // Run the confirmation loop until stopped while (!stopped) { - condition.wait (lock, [&] () { return !set.empty () || stopped; }); - // Loop if there are items to process - if (!stopped && !set.empty ()) + stats.inc (nano::stat::type::confirming_set, nano::stat::detail::loop); + + if (!set.empty ()) { - std::deque> cemented; - std::deque already; - // Move items in to back buffer and release lock so more items can be added to the front buffer - processing = std::move (this->set); - // Process all items in the back buffer - for (auto i = processing.begin (), n = processing.end (); !stopped && i != n;) - { - lock.unlock (); // Waiting for db write is potentially slow - auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height); - auto tx = ledger.tx_begin_write ({ nano::tables::confirmation_height }); - lock.lock (); - // Process items in the back buffer within a single transaction for a limited amount of time - for (auto timeout = std::chrono::steady_clock::now () + batch_time; !stopped && std::chrono::steady_clock::now () < timeout && i != n; ++i) - { - auto item = *i; - lock.unlock (); - auto added = ledger.confirm (tx, item); - if (!added.empty ()) - { - // Confirming this block may implicitly confirm more - cemented.insert (cemented.end (), added.begin (), added.end ()); - } - else - { - already.push_back (item); - } - lock.lock (); - } - } - lock.unlock (); - for (auto const & i : cemented) - { - cemented_observers.notify (i); - } - for (auto const & i : already) - { - block_already_cemented_observers.notify (i); - } - lock.lock (); - // Clear and free back buffer by re-initializing - processing = decltype (processing){}; + run_batch (lock); + debug_assert (lock.owns_lock ()); + } + else + { + condition.wait (lock, [&] () { return !set.empty () || stopped; }); } } } +void nano::confirming_set::run_batch (std::unique_lock & lock) +{ + debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); + debug_assert (!set.empty ()); + + std::deque> cemented; + std::deque already; + + // Move items in to back buffer and release lock so more items can be added to the front buffer + release_assert (processing.empty ()); + swap (set, processing); + + // Process all items in the back buffer + for (auto i = processing.begin (), n = processing.end (); !stopped && i != n;) + { + lock.unlock (); // Waiting for db write is potentially slow + + auto guard = ledger.store.write_queue.wait (nano::store::writer::confirmation_height); + auto tx = ledger.tx_begin_write ({ nano::tables::confirmation_height }); + + lock.lock (); + // Process items in the back buffer within a single transaction for a limited amount of time + for (auto timeout = std::chrono::steady_clock::now () + batch_time; !stopped && std::chrono::steady_clock::now () < timeout && i != n; ++i) + { + auto item = *i; + lock.unlock (); + + auto added = ledger.confirm (tx, item); + if (!added.empty ()) + { + // Confirming this block may implicitly confirm more + cemented.insert (cemented.end (), added.begin (), added.end ()); + stats.add (nano::stat::type::confirming_set, nano::stat::detail::confirmed, added.size ()); + } + else + { + already.push_back (item); + stats.inc (nano::stat::type::confirming_set, nano::stat::detail::already_confirmed); + } + + lock.lock (); + } + } + + lock.unlock (); + + workers.push_task ([this, cemented = std::move (cemented), already = std::move (already)] () { + stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify); + + for (auto const & i : cemented) + { + cemented_observers.notify (i); + } + for (auto const & i : already) + { + block_already_cemented_observers.notify (i); + } + }); + + lock.lock (); + + processing.clear (); +} + std::unique_ptr nano::confirming_set::collect_container_info (std::string const & name) const { std::lock_guard guard{ mutex }; @@ -116,5 +164,6 @@ std::unique_ptr nano::confirming_set::collect_co auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "set", set.size (), sizeof (typename decltype (set)::value_type) })); composite->add_component (std::make_unique (container_info{ "processing", processing.size (), sizeof (typename decltype (processing)::value_type) })); + composite->add_component (std::make_unique (container_info{ "notifications", workers.num_queued_tasks (), sizeof (std::function) })); return composite; } diff --git a/nano/node/confirming_set.hpp b/nano/node/confirming_set.hpp index 06feb52e1..c991d156e 100644 --- a/nano/node/confirming_set.hpp +++ b/nano/node/confirming_set.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -11,8 +12,10 @@ namespace nano { +class node; class block; class ledger; +class stats; } namespace nano @@ -26,8 +29,9 @@ class confirming_set final friend class confirmation_height_pruned_source_Test; public: - confirming_set (nano::ledger & ledger, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 }); + confirming_set (nano::ledger &, nano::stats &, std::chrono::milliseconds batch_time = std::chrono::milliseconds{ 500 }); ~confirming_set (); + // Adds a block to the set of blocks to be confirmed void add (nano::block_hash const & hash); void start (); @@ -43,10 +47,17 @@ public: private: void run (); + void run_batch (std::unique_lock &); + nano::ledger & ledger; - std::chrono::milliseconds batch_time; + nano::stats & stats; + + std::chrono::milliseconds const batch_time; std::unordered_set set; std::unordered_set processing; + + nano::thread_pool workers; + bool stopped{ false }; mutable std::mutex mutex; std::condition_variable condition; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 239127481..9c220cacf 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -184,7 +184,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy application_path (application_path_a), port_mapping (*this), block_processor (*this), - confirming_set_impl{ std::make_unique (ledger, config.confirming_set_batch_time) }, + confirming_set_impl{ std::make_unique (ledger, stats, config.confirming_set_batch_time) }, confirming_set{ *confirming_set_impl }, active_impl{ std::make_unique (*this, confirming_set, block_processor) }, active{ *active_impl },