From 1cb607afde2911cc57765ab3d423c82a3319f5c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 12 Nov 2023 15:19:42 +0100 Subject: [PATCH 01/13] Blockprocessor context --- nano/node/active_transactions.cpp | 2 +- nano/node/block_broadcast.cpp | 2 +- nano/node/blocking_observer.cpp | 2 +- nano/node/blockprocessor.cpp | 76 ++++++++++++++--------- nano/node/blockprocessor.hpp | 51 ++++++++++----- nano/node/bootstrap_ascending/service.cpp | 3 +- nano/node/node.cpp | 6 +- nano/node/process_live_dispatcher.cpp | 2 +- 8 files changed, 91 insertions(+), 53 deletions(-) diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 01c75352..b32a3ace 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -34,7 +34,7 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi }); // Notify elections about alternative (forked) blocks - block_processor.processed.add ([this] (auto const & result, auto const & block) { + block_processor.processed.add ([this] (auto const & result, auto const & block, auto const & context) { switch (result.code) { case nano::process_result::fork: diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index fc996fc0..852f1736 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -16,7 +16,7 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) { return; } - block_processor.processed.add ([this] (auto const & result, auto const & block) { + block_processor.processed.add ([this] (auto const & result, auto const & block, auto const & context) { switch (result.code) { case nano::process_result::progress: diff --git a/nano/node/blocking_observer.cpp b/nano/node/blocking_observer.cpp index 339b09b8..6ef14f69 100644 --- a/nano/node/blocking_observer.cpp +++ b/nano/node/blocking_observer.cpp @@ -3,7 +3,7 @@ void nano::blocking_observer::connect (nano::block_processor & block_processor) { - block_processor.processed.add ([this] (auto const & result, auto const & block) { + block_processor.processed.add ([this] (auto const & result, auto const & block, auto const & context) { observe (result, block); }); } diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index c0477b4e..28ecd840 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -7,16 +7,15 @@ #include nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) : - next_log (std::chrono::steady_clock::now ()), node (node_a), - write_database_queue (write_database_queue_a) + write_database_queue (write_database_queue_a), + next_log (std::chrono::steady_clock::now ()) { batch_processed.add ([this] (auto const & items) { // For every batch item: notify the 'processed' observer. - for (auto const & item : items) + for (auto const & [result, block, context] : items) { - auto const & [result, block] = item; - processed.notify (result, block); + processed.notify (result, block, context); } }); blocking.connect (*this); @@ -53,7 +52,7 @@ bool nano::block_processor::half_full () return size () >= node.flags.block_processor_full_size / 2; } -void nano::block_processor::add (std::shared_ptr const & block) +void nano::block_processor::add (std::shared_ptr const & block, block_source const source) { if (full ()) { @@ -65,14 +64,14 @@ void nano::block_processor::add (std::shared_ptr const & block) node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); return; } - add_impl (block); + add_impl (block, source); return; } -std::optional nano::block_processor::add_blocking (std::shared_ptr const & block) +std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) { auto future = blocking.insert (block); - add_impl (block); + add_impl (block, source); condition.notify_all (); std::optional result; try @@ -130,7 +129,7 @@ void nano::block_processor::force (std::shared_ptr const & block_a) { { nano::lock_guard lock{ mutex }; - forced.push_back (block_a); + forced.emplace_back (block_a, context{ block_source::forced, std::chrono::steady_clock::now () }); } condition.notify_all (); } @@ -181,28 +180,52 @@ bool nano::block_processor::have_blocks () return have_blocks_ready (); } -void nano::block_processor::add_impl (std::shared_ptr block) +void nano::block_processor::add_impl (std::shared_ptr block, block_source const source) { { nano::lock_guard guard{ mutex }; - blocks.emplace_back (block); + blocks.emplace_back (block, context{ source, std::chrono::steady_clock::now () }); } condition.notify_all (); } +auto nano::block_processor::next_block () -> std::pair +{ + debug_assert (!mutex.try_lock ()); + + if (forced.empty ()) + { + release_assert (!blocks.empty ()); // Checked before calling this function + + auto entry = blocks.front (); + blocks.pop_front (); + return { entry, false }; // Not forced + } + else + { + auto entry = forced.front (); + forced.pop_front (); + return { entry, true }; // Forced + } +} + auto nano::block_processor::process_batch (nano::unique_lock & lock_a) -> std::deque { std::deque processed; + auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending })); nano::timer timer_l; + lock_a.lock (); + timer_l.start (); // Processing blocks unsigned number_of_blocks_processed (0), number_of_forced_processed (0); auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; + while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) { // TODO: Cleaner periodical logging @@ -211,33 +234,26 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ()); } - std::shared_ptr block; - nano::block_hash hash (0); - bool force (false); - if (forced.empty ()) - { - block = blocks.front (); - blocks.pop_front (); - hash = block->hash (); - } - else - { - block = forced.front (); - forced.pop_front (); - hash = block->hash (); - force = true; - number_of_forced_processed++; - } + auto [entry, force] = next_block (); + auto const & [block, context] = entry; + auto const hash = block->hash (); + lock_a.unlock (); + if (force) { + number_of_forced_processed++; rollback_competitor (transaction, *block); } + number_of_blocks_processed++; + auto result = process_one (transaction, block, force); - processed.emplace_back (result, block); + processed.emplace_back (result, block, context); + lock_a.lock (); } + lock_a.unlock (); if (number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100)) diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index a53fa807..df681fed 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -26,14 +26,35 @@ class write_database_queue; */ class block_processor final { +public: // Context + enum class block_source + { + unknown = 0, + live, + bootstrap, + unchecked, + local, + forced, + }; + + struct context + { + block_source source; + std::chrono::steady_clock::time_point arrival; + }; + + using entry_t = std::pair, context>; + using processed_t = std::tuple, context>; + public: - explicit block_processor (nano::node &, nano::write_database_queue &); + block_processor (nano::node &, nano::write_database_queue &); + void stop (); std::size_t size (); bool full (); bool half_full (); - void add (std::shared_ptr const &); - std::optional add_blocking (std::shared_ptr const & block); + void add (std::shared_ptr const &, block_source = block_source::live); + std::optional add_blocking (std::shared_ptr const & block, block_source); void force (std::shared_ptr const &); bool should_log (); bool have_blocks_ready (); @@ -43,9 +64,7 @@ public: std::atomic flushing{ false }; public: // Events - using processed_t = std::pair>; - nano::observer_set> processed; - + nano::observer_set, context> processed; // The batch observer feeds the processed obsever nano::observer_set const &> batch_processed; @@ -54,23 +73,27 @@ private: private: // Roll back block in the ledger that conflicts with 'block' - void rollback_competitor (store::write_transaction const & transaction, nano::block const & block); - nano::process_return process_one (store::write_transaction const &, std::shared_ptr block, bool const = false); + void rollback_competitor (store::write_transaction const &, nano::block const & block); + nano::process_return process_one (store::write_transaction const &, std::shared_ptr block, bool forced = false); void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); std::deque process_batch (nano::unique_lock &); - void add_impl (std::shared_ptr block); + std::pair next_block (); /// @returns + void add_impl (std::shared_ptr block, block_source source); + +private: // Dependencies + nano::node & node; + nano::write_database_queue & write_database_queue; + +private: bool stopped{ false }; bool active{ false }; std::chrono::steady_clock::time_point next_log; - std::deque> blocks; - std::deque> forced; + std::deque blocks; + std::deque forced; nano::condition_variable condition; - nano::node & node; - nano::write_database_queue & write_database_queue; nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread processing_thread; friend std::unique_ptr collect_container_info (block_processor & block_processor, std::string const & name); }; -std::unique_ptr collect_container_info (block_processor & block_processor, std::string const & name); } diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index 76e2aac3..23cd4991 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -34,10 +34,9 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano: nano::lock_guard lock{ mutex }; auto transaction = ledger.store.tx_begin_read (); - for (auto const & [result, block] : batch) + for (auto const & [result, block, context] : batch) { debug_assert (block != nullptr); - inspect (transaction, result, *block); } } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 64bf414b..e186925b 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -206,7 +206,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons process_live_dispatcher.connect (block_processor); unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { - this->block_processor.add (info.block); + block_processor.add (info.block, nano::block_processor::block_source::unchecked); }); vote_cache.rep_weight_query = [this] (nano::account const & rep) { @@ -577,7 +577,7 @@ std::optional nano::node::process_local (std::shared_ptrhash ()); block_broadcast.set_local (block_a); - return block_processor.add_blocking (block_a); + return block_processor.add_blocking (block_a, nano::block_processor::block_source::local); } void nano::node::process_local_async (std::shared_ptr const & block_a) @@ -585,7 +585,7 @@ void nano::node::process_local_async (std::shared_ptr const & block // Add block hash as recently arrived to trigger automatic rebroadcast and election block_arrival.add (block_a->hash ()); // Set current time to trigger automatic rebroadcast and election - block_processor.add (block_a); + block_processor.add (block_a, nano::block_processor::block_source::local); } void nano::node::start () diff --git a/nano/node/process_live_dispatcher.cpp b/nano/node/process_live_dispatcher.cpp index c7a6d16d..daaad17f 100644 --- a/nano/node/process_live_dispatcher.cpp +++ b/nano/node/process_live_dispatcher.cpp @@ -20,7 +20,7 @@ void nano::process_live_dispatcher::connect (nano::block_processor & block_proce { block_processor.batch_processed.add ([this] (auto const & batch) { auto const transaction = ledger.store.tx_begin_read (); - for (auto const & [result, block] : batch) + for (auto const & [result, block, context] : batch) { debug_assert (block != nullptr); inspect (result, *block, transaction); From fd7445a74a2468f234c4bd544afff661b137449f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 12 Nov 2023 15:42:40 +0100 Subject: [PATCH 02/13] Remove `block_arrival` --- nano/core_test/node.cpp | 47 --------------------------------- nano/lib/locks.cpp | 2 -- nano/lib/locks.hpp | 1 - nano/node/CMakeLists.txt | 2 -- nano/node/block_arrival.cpp | 35 ------------------------- nano/node/block_arrival.hpp | 49 ----------------------------------- nano/node/block_broadcast.cpp | 10 +++---- nano/node/block_broadcast.hpp | 9 +++---- nano/node/blockprocessor.cpp | 10 ++++++- nano/node/blockprocessor.hpp | 4 +-- nano/node/json_handler.cpp | 1 - nano/node/node.cpp | 11 ++------ nano/node/node.hpp | 2 -- nano/rpc_test/rpc.cpp | 1 - 14 files changed, 21 insertions(+), 163 deletions(-) delete mode 100644 nano/node/block_arrival.cpp delete mode 100644 nano/node/block_arrival.hpp diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index fddf1683..0f7410e7 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2111,53 +2111,6 @@ TEST (node, block_confirm) ASSERT_TIMELY_EQ (10s, node1.active.recently_cemented.list ().size (), 1); } -TEST (node, block_arrival) -{ - nano::test::system system (1); - auto & node (*system.nodes[0]); - ASSERT_EQ (0, node.block_arrival.arrival.size ()); - nano::block_hash hash1 (1); - node.block_arrival.add (hash1); - ASSERT_EQ (1, node.block_arrival.arrival.size ()); - node.block_arrival.add (hash1); - ASSERT_EQ (1, node.block_arrival.arrival.size ()); - nano::block_hash hash2 (2); - node.block_arrival.add (hash2); - ASSERT_EQ (2, node.block_arrival.arrival.size ()); -} - -TEST (node, block_arrival_size) -{ - nano::test::system system (1); - auto & node (*system.nodes[0]); - auto time (std::chrono::steady_clock::now () - nano::block_arrival::arrival_time_min - std::chrono::seconds (5)); - nano::block_hash hash (0); - for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i) - { - node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash }); - ++hash.qwords[0]; - } - ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ()); - node.block_arrival.recent (0); - ASSERT_EQ (nano::block_arrival::arrival_size_min, node.block_arrival.arrival.size ()); -} - -TEST (node, block_arrival_time) -{ - nano::test::system system (1); - auto & node (*system.nodes[0]); - auto time (std::chrono::steady_clock::now ()); - nano::block_hash hash (0); - for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i) - { - node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash }); - ++hash.qwords[0]; - } - ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ()); - node.block_arrival.recent (0); - ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ()); -} - TEST (node, confirm_quorum) { nano::test::system system (1); diff --git a/nano/lib/locks.cpp b/nano/lib/locks.cpp index 3217e5e5..231ef13c 100644 --- a/nano/lib/locks.cpp +++ b/nano/lib/locks.cpp @@ -251,8 +251,6 @@ char const * nano::mutex_identifier (mutexes mutex) { case mutexes::active: return "active"; - case mutexes::block_arrival: - return "block_arrival"; case mutexes::block_processor: return "block_processor"; case mutexes::block_uniquer: diff --git a/nano/lib/locks.hpp b/nano/lib/locks.hpp index f8447edf..1bbe0a7f 100644 --- a/nano/lib/locks.hpp +++ b/nano/lib/locks.hpp @@ -20,7 +20,6 @@ bool any_filters_registered (); enum class mutexes { active, - block_arrival, block_processor, block_uniquer, blockstore_cache, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 37278a50..3f7c2081 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -20,8 +20,6 @@ add_library( backlog_population.cpp bandwidth_limiter.hpp bandwidth_limiter.cpp - block_arrival.hpp - block_arrival.cpp block_broadcast.cpp block_broadcast.hpp blocking_observer.cpp diff --git a/nano/node/block_arrival.cpp b/nano/node/block_arrival.cpp deleted file mode 100644 index 914e18dc..00000000 --- a/nano/node/block_arrival.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include - -bool nano::block_arrival::add (nano::block_hash const & hash_a) -{ - nano::lock_guard lock{ mutex }; - auto now (std::chrono::steady_clock::now ()); - auto inserted (arrival.get ().emplace_back (nano::block_arrival_info{ now, hash_a })); - auto result (!inserted.second); - return result; -} - -bool nano::block_arrival::recent (nano::block_hash const & hash_a) -{ - nano::lock_guard lock{ mutex }; - auto now (std::chrono::steady_clock::now ()); - while (arrival.size () > arrival_size_min && arrival.get ().front ().arrival + arrival_time_min < now) - { - arrival.get ().pop_front (); - } - return arrival.get ().find (hash_a) != arrival.get ().end (); -} - -std::unique_ptr nano::collect_container_info (block_arrival & block_arrival, std::string const & name) -{ - std::size_t count = 0; - { - nano::lock_guard guard{ block_arrival.mutex }; - count = block_arrival.arrival.size (); - } - - auto sizeof_element = sizeof (decltype (block_arrival.arrival)::value_type); - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "arrival", count, sizeof_element })); - return composite; -} \ No newline at end of file diff --git a/nano/node/block_arrival.hpp b/nano/node/block_arrival.hpp deleted file mode 100644 index 71aa3946..00000000 --- a/nano/node/block_arrival.hpp +++ /dev/null @@ -1,49 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -#include - -namespace nano -{ -class block_arrival_info final -{ -public: - std::chrono::steady_clock::time_point arrival; - nano::block_hash hash; -}; - -// This class tracks blocks that are probably live because they arrived in a UDP packet -// This gives a fairly reliable way to differentiate between blocks being inserted via bootstrap or new, live blocks. -class block_arrival final -{ -public: - // Return `true' to indicated an error if the block has already been inserted - bool add (nano::block_hash const &); - bool recent (nano::block_hash const &); - - // clang-format off - class tag_sequence {}; - class tag_hash {}; - - boost::multi_index_container>, - boost::multi_index::hashed_unique, - boost::multi_index::member>>> - arrival; - // clang-format on - - nano::mutex mutex{ mutex_identifier (mutexes::block_arrival) }; - - static std::size_t constexpr arrival_size_min = 8 * 1024; - static std::chrono::seconds constexpr arrival_time_min = std::chrono::seconds (300); -}; - -std::unique_ptr collect_container_info (block_arrival & block_arrival, std::string const & name); -} \ No newline at end of file diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index 852f1736..4b7e5e77 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -1,11 +1,9 @@ -#include #include #include #include -nano::block_broadcast::block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled) : +nano::block_broadcast::block_broadcast (nano::network & network, bool enabled) : network{ network }, - block_arrival{ block_arrival }, enabled{ enabled } { } @@ -20,7 +18,7 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) switch (result.code) { case nano::process_result::progress: - observe (block); + observe (block, context); break; default: break; @@ -29,7 +27,7 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) }); } -void nano::block_broadcast::observe (std::shared_ptr block) +void nano::block_broadcast::observe (std::shared_ptr block, nano::block_processor::context const & context) { nano::unique_lock lock{ mutex }; auto existing = local.find (block); @@ -43,7 +41,7 @@ void nano::block_broadcast::observe (std::shared_ptr block) } else { - if (block_arrival.recent (block->hash ())) + if (context.source != nano::block_processor::block_source::bootstrap) { // Block arrived from realtime traffic, do normal gossip. network.flood_block (block, nano::transport::buffer_drop_policy::limiter); diff --git a/nano/node/block_broadcast.hpp b/nano/node/block_broadcast.hpp index 081d3f2f..19055d30 100644 --- a/nano/node/block_broadcast.hpp +++ b/nano/node/block_broadcast.hpp @@ -1,20 +1,20 @@ #pragma once #include +#include #include #include namespace nano { -class block_arrival; -class block_processor; class network; + // This class tracks blocks that originated from this node. class block_broadcast { public: - block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled = false); + block_broadcast (nano::network & network, bool enabled = false); // Add batch_processed observer to block_processor if enabled void connect (nano::block_processor & block_processor); // Mark a block as originating locally @@ -23,10 +23,9 @@ public: private: // Block_processor observer - void observe (std::shared_ptr block); + void observe (std::shared_ptr block, nano::block_processor::context const &); nano::network & network; - nano::block_arrival & block_arrival; std::unordered_set> local; // Blocks originated on this node nano::mutex mutex; bool enabled; diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 28ecd840..1ebad06c 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -6,6 +6,10 @@ #include +/* + * block_processor + */ + nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) : node (node_a), write_database_queue (write_database_queue_a), @@ -184,7 +188,7 @@ void nano::block_processor::add_impl (std::shared_ptr block, block_ { { nano::lock_guard guard{ mutex }; - blocks.emplace_back (block, context{ source, std::chrono::steady_clock::now () }); + blocks.emplace_back (block, context{ source }); } condition.notify_all (); } @@ -377,3 +381,7 @@ std::unique_ptr nano::collect_container_info (bl composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); return composite; } + +/* + * context + */ diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index df681fed..ad266be0 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -39,8 +39,8 @@ public: // Context struct context { - block_source source; - std::chrono::steady_clock::time_point arrival; + block_source const source{}; + std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () }; }; using entry_t = std::pair, context>; diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 58af00f1..91034d05 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -1214,7 +1214,6 @@ void nano::json_handler::block_confirm () nano::election_status status{ block_l, 0, 0, std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values::zero (), 0, 1, 0, nano::election_status_type::active_confirmation_height }; node.active.recently_cemented.put (status); // Trigger callback for confirmed block - node.block_arrival.add (hash); auto account (node.ledger.account (transaction, hash)); bool error_or_pruned (false); auto amount (node.ledger.amount_safe (transaction, hash, error_or_pruned)); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index e186925b..01d98b25 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -197,7 +197,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons epoch_upgrader{ *this, ledger, store, network_params, logger }, startup_time (std::chrono::steady_clock::now ()), node_seq (seq), - block_broadcast{ network, block_arrival, !flags.disable_block_processor_republishing }, + block_broadcast{ network, !flags.disable_block_processor_republishing }, process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket } { logger.debug (nano::log::type::node, "Constructing node..."); @@ -241,7 +241,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons { observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { auto block_a (status_a.winner); - if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height) && this->block_arrival.recent (block_a->hash ())) + if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) { auto node_l (shared_from_this ()); background ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { @@ -538,7 +538,6 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.vote_processor, "vote_processor")); composite->add_component (collect_container_info (node.rep_crawler, "rep_crawler")); composite->add_component (collect_container_info (node.block_processor, "block_processor")); - composite->add_component (collect_container_info (node.block_arrival, "block_arrival")); composite->add_component (collect_container_info (node.online_reps, "online_reps")); composite->add_component (collect_container_info (node.history, "history")); composite->add_component (node.block_uniquer.collect_container_info ("block_uniquer")); @@ -557,7 +556,6 @@ std::unique_ptr nano::collect_container_info (no void nano::node::process_active (std::shared_ptr const & incoming) { - block_arrival.add (incoming->hash ()); block_processor.add (incoming); } @@ -574,17 +572,12 @@ nano::process_return nano::node::process (nano::block & block) std::optional nano::node::process_local (std::shared_ptr const & block_a) { - // Add block hash as recently arrived to trigger automatic rebroadcast and election - block_arrival.add (block_a->hash ()); block_broadcast.set_local (block_a); return block_processor.add_blocking (block_a, nano::block_processor::block_source::local); } void nano::node::process_local_async (std::shared_ptr const & block_a) { - // Add block hash as recently arrived to trigger automatic rebroadcast and election - block_arrival.add (block_a->hash ()); - // Set current time to trigger automatic rebroadcast and election block_processor.add (block_a, nano::block_processor::block_source::local); } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index d5eae059..19e5a7a2 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -168,7 +167,6 @@ public: nano::vote_processor vote_processor; unsigned warmed_up; nano::block_processor block_processor; - nano::block_arrival block_arrival; nano::local_vote_history history; nano::block_uniquer block_uniquer; nano::vote_uniquer vote_uniquer; diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 291e0036..3c4d2337 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -4431,7 +4431,6 @@ TEST (rpc, populate_backlog) .work (*node->work_generate_blocking (latest)) .build (); ASSERT_EQ (nano::process_result::progress, node->process (*send).code); - ASSERT_FALSE (node->block_arrival.recent (send->hash ())); auto const rpc_ctx = add_rpc (system, node); boost::property_tree::ptree request; From f1cf12dbd6de124dac8c7eee5361debe3d50ff4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 8 Feb 2024 17:45:22 +0100 Subject: [PATCH 03/13] Simplify `block_broadcast` --- nano/node/block_broadcast.cpp | 29 ++--------------------------- nano/node/block_broadcast.hpp | 7 +------ nano/node/node.cpp | 1 - 3 files changed, 3 insertions(+), 34 deletions(-) diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index 4b7e5e77..ec2162d1 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -23,17 +23,12 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) default: break; } - erase (block); }); } -void nano::block_broadcast::observe (std::shared_ptr block, nano::block_processor::context const & context) +void nano::block_broadcast::observe (std::shared_ptr const & block, nano::block_processor::context const & context) { - nano::unique_lock lock{ mutex }; - auto existing = local.find (block); - auto local_l = existing != local.end (); - lock.unlock (); - if (local_l) + if (context.source == nano::block_processor::block_source::local) { // Block created on this node // Perform more agressive initial flooding @@ -53,23 +48,3 @@ void nano::block_broadcast::observe (std::shared_ptr block, nano::b } } } - -void nano::block_broadcast::set_local (std::shared_ptr block) -{ - if (!enabled) - { - return; - } - nano::lock_guard lock{ mutex }; - local.insert (block); -} - -void nano::block_broadcast::erase (std::shared_ptr block) -{ - if (!enabled) - { - return; - } - nano::lock_guard lock{ mutex }; - local.erase (block); -} diff --git a/nano/node/block_broadcast.hpp b/nano/node/block_broadcast.hpp index 19055d30..4129ddc4 100644 --- a/nano/node/block_broadcast.hpp +++ b/nano/node/block_broadcast.hpp @@ -17,17 +17,12 @@ public: block_broadcast (nano::network & network, bool enabled = false); // Add batch_processed observer to block_processor if enabled void connect (nano::block_processor & block_processor); - // Mark a block as originating locally - void set_local (std::shared_ptr block); - void erase (std::shared_ptr block); private: // Block_processor observer - void observe (std::shared_ptr block, nano::block_processor::context const &); + void observe (std::shared_ptr const & block, nano::block_processor::context const &); nano::network & network; - std::unordered_set> local; // Blocks originated on this node - nano::mutex mutex; bool enabled; }; } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 01d98b25..ccdeaaf0 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -572,7 +572,6 @@ nano::process_return nano::node::process (nano::block & block) std::optional nano::node::process_local (std::shared_ptr const & block_a) { - block_broadcast.set_local (block_a); return block_processor.add_blocking (block_a, nano::block_processor::block_source::local); } From ba7120ca53b68fa1fea030afe1446e0ac9ae7d83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 12 Nov 2023 16:17:36 +0100 Subject: [PATCH 04/13] Remove `blocking_observer` --- nano/core_test/active_transactions.cpp | 6 +++ nano/node/CMakeLists.txt | 2 - nano/node/blocking_observer.cpp | 62 ---------------------- nano/node/blocking_observer.hpp | 32 ----------- nano/node/blockprocessor.cpp | 73 ++++++++++++++++---------- nano/node/blockprocessor.hpp | 34 ++++++++---- nano/secure/common.cpp | 6 +++ nano/secure/common.hpp | 6 ++- 8 files changed, 84 insertions(+), 137 deletions(-) delete mode 100644 nano/node/blocking_observer.cpp delete mode 100644 nano/node/blocking_observer.hpp diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 910a518c..cf867687 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -190,6 +190,12 @@ TEST (active_transactions, keep_local) auto const send4 = wallet.send_action (nano::dev::genesis_key.pub, key4.pub, node.config.receive_minimum.number ()); auto const send5 = wallet.send_action (nano::dev::genesis_key.pub, key5.pub, node.config.receive_minimum.number ()); auto const send6 = wallet.send_action (nano::dev::genesis_key.pub, key6.pub, node.config.receive_minimum.number ()); + ASSERT_NE (nullptr, send1); + ASSERT_NE (nullptr, send2); + ASSERT_NE (nullptr, send3); + ASSERT_NE (nullptr, send4); + ASSERT_NE (nullptr, send5); + ASSERT_NE (nullptr, send6); // force-confirm blocks for (auto const & block : { send1, send2, send3, send4, send5, send6 }) diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 3f7c2081..4452df57 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -22,8 +22,6 @@ add_library( bandwidth_limiter.cpp block_broadcast.cpp block_broadcast.hpp - blocking_observer.cpp - blocking_observer.hpp blockprocessor.hpp blockprocessor.cpp bootstrap/block_deserializer.hpp diff --git a/nano/node/blocking_observer.cpp b/nano/node/blocking_observer.cpp deleted file mode 100644 index 6ef14f69..00000000 --- a/nano/node/blocking_observer.cpp +++ /dev/null @@ -1,62 +0,0 @@ -#include -#include - -void nano::blocking_observer::connect (nano::block_processor & block_processor) -{ - block_processor.processed.add ([this] (auto const & result, auto const & block, auto const & context) { - observe (result, block); - }); -} - -void nano::blocking_observer::stop () -{ - nano::unique_lock lock{ mutex }; - stopped = true; - auto discard = std::move (blocking); - // Signal broken promises outside lock - lock.unlock (); - discard.clear (); // ~promise future_error -} - -void nano::blocking_observer::observe (nano::process_return const & result, std::shared_ptr block) -{ - nano::unique_lock lock{ mutex }; - auto existing = blocking.find (block); - if (existing != blocking.end ()) - { - auto promise = std::move (existing->second); - blocking.erase (existing); - // Signal promise outside of lock - lock.unlock (); - promise.set_value (result); - } -} - -std::future nano::blocking_observer::insert (std::shared_ptr block) -{ - nano::lock_guard lock{ mutex }; - if (stopped) - { - std::promise promise; - return promise.get_future (); // ~promise future_error - } - auto iterator = blocking.emplace (block, std::promise{}); - return iterator->second.get_future (); -} - -bool nano::blocking_observer::exists (std::shared_ptr block) -{ - nano::lock_guard lock{ mutex }; - auto existing = blocking.find (block); - return existing != blocking.end (); -} - -void nano::blocking_observer::erase (std::shared_ptr block) -{ - nano::lock_guard lock{ mutex }; - auto existing = blocking.find (block); - if (existing != blocking.end ()) - { - blocking.erase (existing); - } -} \ No newline at end of file diff --git a/nano/node/blocking_observer.hpp b/nano/node/blocking_observer.hpp deleted file mode 100644 index bd971412..00000000 --- a/nano/node/blocking_observer.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include -#include - -#include -#include -#include - -namespace nano -{ -class block; -class block_processor; -// Observer that facilitates a blocking call to block processing which is done asynchronosly by the block_processor -class blocking_observer -{ -public: - void connect (nano::block_processor & block_processor); - // Stop the observer and trigger broken promise exceptions - void stop (); - // Block processor observer - void observe (nano::process_return const & result, std::shared_ptr block); - [[nodiscard]] std::future insert (std::shared_ptr block); - bool exists (std::shared_ptr block); - void erase (std::shared_ptr block); - -private: - std::unordered_multimap, std::promise> blocking; - bool stopped{ false }; - nano::mutex mutex; -}; -} diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 1ebad06c..1b272718 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -6,6 +6,26 @@ #include +/* + * block_processor::context + */ + +nano::block_processor::context::context (nano::block_processor::block_source source_a) : + source{ source_a } +{ + debug_assert (source != nano::block_processor::block_source::unknown); +} + +auto nano::block_processor::context::get_future () -> std::future +{ + return promise.get_future (); +} + +void nano::block_processor::context::set_result (result_t const & result) +{ + promise.set_value (result); +} + /* * block_processor */ @@ -22,7 +42,6 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas processed.notify (result, block, context); } }); - blocking.connect (*this); processing_thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::block_processing); this->process_blocks (); @@ -36,7 +55,6 @@ void nano::block_processor::stop () stopped = true; } condition.notify_all (); - blocking.stop (); nano::join_or_pass (processing_thread); } @@ -68,33 +86,28 @@ void nano::block_processor::add (std::shared_ptr const & block, blo node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); return; } - add_impl (block, source); + add_impl (block, context{ source }); return; } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) { - auto future = blocking.insert (block); - add_impl (block, source); - condition.notify_all (); - std::optional result; + context ctx{ source }; + auto future = ctx.get_future (); + add_impl (block, std::move (ctx)); try { auto status = future.wait_for (node.config.block_process_timeout); debug_assert (status != std::future_status::deferred); if (status == std::future_status::ready) { - result = future.get (); - } - else - { - blocking.erase (block); + return future.get (); } } catch (std::future_error const &) { } - return result; + return std::nullopt; } void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block) @@ -133,7 +146,7 @@ void nano::block_processor::force (std::shared_ptr const & block_a) { { nano::lock_guard lock{ mutex }; - forced.emplace_back (block_a, context{ block_source::forced, std::chrono::steady_clock::now () }); + forced.emplace_back (block_a, context{ block_source::forced }); } condition.notify_all (); } @@ -147,8 +160,16 @@ void nano::block_processor::process_blocks () { active = true; lock.unlock (); + auto processed = process_batch (lock); + + for (auto & [result, block, context] : processed) + { + context.set_result (result); + } + batch_processed.notify (processed); + lock.lock (); active = false; } @@ -184,11 +205,11 @@ bool nano::block_processor::have_blocks () return have_blocks_ready (); } -void nano::block_processor::add_impl (std::shared_ptr block, block_source const source) +void nano::block_processor::add_impl (std::shared_ptr block, context ctx) { { nano::lock_guard guard{ mutex }; - blocks.emplace_back (block, context{ source }); + blocks.emplace_back (block, std::move (ctx)); } condition.notify_all (); } @@ -201,21 +222,21 @@ auto nano::block_processor::next_block () -> std::pair { release_assert (!blocks.empty ()); // Checked before calling this function - auto entry = blocks.front (); + auto entry = std::move (blocks.front ()); blocks.pop_front (); - return { entry, false }; // Not forced + return { std::move (entry), false }; // Not forced } else { - auto entry = forced.front (); + auto entry = std::move (forced.front ()); forced.pop_front (); - return { entry, true }; // Forced + return { std::move (entry), true }; // Forced } } -auto nano::block_processor::process_batch (nano::unique_lock & lock_a) -> std::deque +auto nano::block_processor::process_batch (nano::unique_lock & lock_a) -> processed_batch_t { - std::deque processed; + processed_batch_t processed; auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending })); @@ -239,7 +260,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock } auto [entry, force] = next_block (); - auto const & [block, context] = entry; + auto & [block, context] = entry; auto const hash = block->hash (); lock_a.unlock (); @@ -253,7 +274,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock number_of_blocks_processed++; auto result = process_one (transaction, block, force); - processed.emplace_back (result, block, context); + processed.emplace_back (result, block, std::move (context)); lock_a.lock (); } @@ -381,7 +402,3 @@ std::unique_ptr nano::collect_container_info (bl composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); return composite; } - -/* - * context - */ diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index ad266be0..672c8df0 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -2,12 +2,12 @@ #include #include -#include #include #include #include #include +#include #include namespace nano::store @@ -37,14 +37,26 @@ public: // Context forced, }; - struct context + class context { - block_source const source{}; + public: + explicit context (block_source); + + block_source const source; std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () }; + + public: + using result_t = nano::process_return; + std::future get_future (); + + private: + void set_result (result_t const &); + std::promise promise; + + friend class block_processor; }; using entry_t = std::pair, context>; - using processed_t = std::tuple, context>; public: block_processor (nano::node &, nano::write_database_queue &); @@ -64,21 +76,21 @@ public: std::atomic flushing{ false }; public: // Events - nano::observer_set, context> processed; - // The batch observer feeds the processed obsever - nano::observer_set const &> batch_processed; + using processed_t = std::tuple, context>; + using processed_batch_t = std::deque; -private: - blocking_observer blocking; + // The batch observer feeds the processed obsever + nano::observer_set const &, context const &> processed; + nano::observer_set batch_processed; private: // Roll back block in the ledger that conflicts with 'block' void rollback_competitor (store::write_transaction const &, nano::block const & block); nano::process_return process_one (store::write_transaction const &, std::shared_ptr block, bool forced = false); void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); - std::deque process_batch (nano::unique_lock &); + processed_batch_t process_batch (nano::unique_lock &); std::pair next_block (); /// @returns - void add_impl (std::shared_ptr block, block_source source); + void add_impl (std::shared_ptr block, context); private: // Dependencies nano::node & node; diff --git a/nano/secure/common.cpp b/nano/secure/common.cpp index 2fe551a7..fca9c573 100644 --- a/nano/secure/common.cpp +++ b/nano/secure/common.cpp @@ -15,6 +15,7 @@ #include #include +#include size_t constexpr nano::send_block::size; size_t constexpr nano::receive_block::size; @@ -496,6 +497,11 @@ void nano::generate_cache::enable_all () account_count = true; } +std::string_view nano::to_string (nano::process_result process_result) +{ + return magic_enum::enum_name (process_result); +} + nano::stat::detail nano::to_stat_detail (nano::process_result process_result) { auto value = magic_enum::enum_cast (magic_enum::enum_name (process_result)); diff --git a/nano/secure/common.hpp b/nano/secure/common.hpp index 503d4d12..190b8561 100644 --- a/nano/secure/common.hpp +++ b/nano/secure/common.hpp @@ -270,6 +270,10 @@ enum class process_result block_position, // This block cannot follow the previous block insufficient_work // Insufficient work for this block, even though it passed the minimal validation }; + +std::string_view to_string (process_result); +nano::stat::detail to_stat_detail (process_result); + class process_return final { public: @@ -282,8 +286,6 @@ enum class tally_result confirm }; -nano::stat::detail to_stat_detail (process_result); - class network_params; /** Genesis keys and ledger constants for network variants */ From 40ebd8c3741b0fb42a125d9d36865ddc3fd25f13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 5 Feb 2024 20:35:26 +0100 Subject: [PATCH 05/13] Blockprocessor entry struct --- nano/node/blockprocessor.cpp | 42 +++++++++++++++++++++--------------- nano/node/blockprocessor.hpp | 17 ++++++++++----- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 1b272718..c73f421f 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -87,7 +87,6 @@ void nano::block_processor::add (std::shared_ptr const & block, blo return; } add_impl (block, context{ source }); - return; } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) @@ -146,7 +145,7 @@ void nano::block_processor::force (std::shared_ptr const & block_a) { { nano::lock_guard lock{ mutex }; - forced.emplace_back (block_a, context{ block_source::forced }); + forced.emplace_back (entry{ block_a, context{ block_source::forced } }); } condition.notify_all (); } @@ -162,7 +161,9 @@ void nano::block_processor::process_blocks () lock.unlock (); auto processed = process_batch (lock); + debug_assert (!lock.owns_lock ()); + // Set results for futures when not holding the lock for (auto & [result, block, context] : processed) { context.set_result (result); @@ -207,31 +208,36 @@ bool nano::block_processor::have_blocks () void nano::block_processor::add_impl (std::shared_ptr block, context ctx) { + release_assert (ctx.source != nano::block_processor::block_source::forced); { nano::lock_guard guard{ mutex }; - blocks.emplace_back (block, std::move (ctx)); + blocks.emplace_back (entry{ block, std::move (ctx) }); } condition.notify_all (); } -auto nano::block_processor::next_block () -> std::pair +auto nano::block_processor::next () -> entry { debug_assert (!mutex.try_lock ()); + debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next - if (forced.empty ()) + if (!blocks.empty ()) { - release_assert (!blocks.empty ()); // Checked before calling this function - - auto entry = std::move (blocks.front ()); + entry entry = std::move (blocks.front ()); + release_assert (entry.ctx.source != nano::block_processor::block_source::forced); blocks.pop_front (); - return { std::move (entry), false }; // Not forced + return entry; } - else + + if (!forced.empty ()) { - auto entry = std::move (forced.front ()); + entry entry = std::move (forced.front ()); + release_assert (entry.ctx.source == nano::block_processor::block_source::forced); forced.pop_front (); - return { std::move (entry), true }; // Forced + return entry; } + + release_assert (false, "next() called when no blocks are ready"); } auto nano::block_processor::process_batch (nano::unique_lock & lock_a) -> processed_batch_t @@ -259,9 +265,11 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ()); } - auto [entry, force] = next_block (); - auto & [block, context] = entry; + entry entry = next (); + context ctx = std::move (entry.ctx); + auto const block = entry.block; auto const hash = block->hash (); + bool const force = ctx.source == nano::block_processor::block_source::forced; lock_a.unlock (); @@ -273,8 +281,8 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock number_of_blocks_processed++; - auto result = process_one (transaction, block, force); - processed.emplace_back (result, block, std::move (context)); + auto result = process_one (transaction, block, ctx, force); + processed.emplace_back (result, block, std::move (ctx)); lock_a.lock (); } @@ -289,7 +297,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock return processed; } -nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, std::shared_ptr block, bool const forced_a) +nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, std::shared_ptr block, context const & context, bool const forced_a) { nano::process_return result; auto hash (block->hash ()); diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 672c8df0..a28c5490 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -56,7 +56,12 @@ public: // Context friend class block_processor; }; - using entry_t = std::pair, context>; +private: + struct entry + { + std::shared_ptr block; + block_processor::context ctx; + }; public: block_processor (nano::node &, nano::write_database_queue &); @@ -86,10 +91,10 @@ public: // Events private: // Roll back block in the ledger that conflicts with 'block' void rollback_competitor (store::write_transaction const &, nano::block const & block); - nano::process_return process_one (store::write_transaction const &, std::shared_ptr block, bool forced = false); + nano::process_return process_one (store::write_transaction const &, std::shared_ptr block, context const &, bool forced = false); void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); - std::pair next_block (); /// @returns + entry next (); void add_impl (std::shared_ptr block, context); private: // Dependencies @@ -99,9 +104,11 @@ private: // Dependencies private: bool stopped{ false }; bool active{ false }; + + std::deque blocks; + std::deque forced; + std::chrono::steady_clock::time_point next_log; - std::deque blocks; - std::deque forced; nano::condition_variable condition; nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread processing_thread; From 1cdea1d0f72902cb59620045982168b2afebca63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 12 Nov 2023 23:11:52 +0100 Subject: [PATCH 06/13] Track processed block source stats --- nano/lib/stats_enums.hpp | 9 +++++++++ nano/node/blockprocessor.cpp | 17 ++++++++++++++--- nano/node/blockprocessor.hpp | 2 ++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 8fc31557..e72e3607 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -36,6 +36,7 @@ enum class type : uint8_t vote_cache, hinting, blockprocessor, + blockprocessor_sources, bootstrap_server, active, active_started, @@ -71,6 +72,7 @@ enum class detail : uint8_t top, none, success, + unknown, // processing queue queue, @@ -110,6 +112,13 @@ enum class detail : uint8_t representative_mismatch, block_position, + // blockprocessor result + live, + bootstrap, + unchecked, + local, + forced, + // message specific not_a_type, invalid, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index c73f421f..1eac1aff 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -6,6 +6,8 @@ #include +#include + /* * block_processor::context */ @@ -299,13 +301,15 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, std::shared_ptr block, context const & context, bool const forced_a) { - nano::process_return result; - auto hash (block->hash ()); - result = node.ledger.process (transaction_a, *block); + auto const hash = block->hash (); + nano::process_return result = node.ledger.process (transaction_a, *block); node.stats.inc (nano::stat::type::blockprocessor, to_stat_detail (result.code)); + node.stats.inc (nano::stat::type::blockprocessor_sources, to_stat_detail (context.source)); node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, nano::log::arg{ "result", result.code }, + nano::log::arg{ "source", context.source }, + nano::log::arg{ "arrival", nano::log::microseconds (context.arrival) }, nano::log::arg{ "forced", forced_a }, nano::log::arg{ "block", block }); @@ -410,3 +414,10 @@ std::unique_ptr nano::collect_container_info (bl composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); return composite; } + +nano::stat::detail nano::to_stat_detail (block_processor::block_source type) +{ + auto value = magic_enum::enum_cast (magic_enum::enum_name (type)); + debug_assert (value); + return value.value_or (nano::stat::detail{}); +} diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index a28c5490..8b406130 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -115,4 +115,6 @@ private: friend std::unique_ptr collect_container_info (block_processor & block_processor, std::string const & name); }; + +nano::stat::detail to_stat_detail (block_processor::block_source); } From 6fc74c8ccbb7edeb3ced8e9f55eaeb0b744fb579 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 8 Feb 2024 18:41:01 +0100 Subject: [PATCH 07/13] Move `block_source` enum --- nano/node/block_broadcast.cpp | 4 ++-- nano/node/blockprocessor.cpp | 14 +++++++------- nano/node/blockprocessor.hpp | 24 ++++++++++++------------ nano/node/node.cpp | 6 +++--- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index ec2162d1..e3f2fc56 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -28,7 +28,7 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) void nano::block_broadcast::observe (std::shared_ptr const & block, nano::block_processor::context const & context) { - if (context.source == nano::block_processor::block_source::local) + if (context.source == nano::block_source::local) { // Block created on this node // Perform more agressive initial flooding @@ -36,7 +36,7 @@ void nano::block_broadcast::observe (std::shared_ptr const & block, } else { - if (context.source != nano::block_processor::block_source::bootstrap) + if (context.source != nano::block_source::bootstrap) { // Block arrived from realtime traffic, do normal gossip. network.flood_block (block, nano::transport::buffer_drop_policy::limiter); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 1eac1aff..0e33234f 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -12,10 +12,10 @@ * block_processor::context */ -nano::block_processor::context::context (nano::block_processor::block_source source_a) : +nano::block_processor::context::context (nano::block_source source_a) : source{ source_a } { - debug_assert (source != nano::block_processor::block_source::unknown); + debug_assert (source != nano::block_source::unknown); } auto nano::block_processor::context::get_future () -> std::future @@ -210,7 +210,7 @@ bool nano::block_processor::have_blocks () void nano::block_processor::add_impl (std::shared_ptr block, context ctx) { - release_assert (ctx.source != nano::block_processor::block_source::forced); + release_assert (ctx.source != nano::block_source::forced); { nano::lock_guard guard{ mutex }; blocks.emplace_back (entry{ block, std::move (ctx) }); @@ -226,7 +226,7 @@ auto nano::block_processor::next () -> entry if (!blocks.empty ()) { entry entry = std::move (blocks.front ()); - release_assert (entry.ctx.source != nano::block_processor::block_source::forced); + release_assert (entry.ctx.source != nano::block_source::forced); blocks.pop_front (); return entry; } @@ -234,7 +234,7 @@ auto nano::block_processor::next () -> entry if (!forced.empty ()) { entry entry = std::move (forced.front ()); - release_assert (entry.ctx.source == nano::block_processor::block_source::forced); + release_assert (entry.ctx.source == nano::block_source::forced); forced.pop_front (); return entry; } @@ -271,7 +271,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock context ctx = std::move (entry.ctx); auto const block = entry.block; auto const hash = block->hash (); - bool const force = ctx.source == nano::block_processor::block_source::forced; + bool const force = ctx.source == nano::block_source::forced; lock_a.unlock (); @@ -415,7 +415,7 @@ std::unique_ptr nano::collect_container_info (bl return composite; } -nano::stat::detail nano::to_stat_detail (block_processor::block_source type) +nano::stat::detail nano::to_stat_detail (nano::block_source type) { auto value = magic_enum::enum_cast (magic_enum::enum_name (type)); debug_assert (value); diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 8b406130..31542b44 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -20,6 +20,18 @@ namespace nano class node; class write_database_queue; +enum class block_source +{ + unknown = 0, + live, + bootstrap, + unchecked, + local, + forced, +}; + +nano::stat::detail to_stat_detail (block_source); + /** * Processing blocks is a potentially long IO operation. * This class isolates block insertion from other operations like servicing network operations @@ -27,16 +39,6 @@ class write_database_queue; class block_processor final { public: // Context - enum class block_source - { - unknown = 0, - live, - bootstrap, - unchecked, - local, - forced, - }; - class context { public: @@ -115,6 +117,4 @@ private: friend std::unique_ptr collect_container_info (block_processor & block_processor, std::string const & name); }; - -nano::stat::detail to_stat_detail (block_processor::block_source); } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index ccdeaaf0..884a0ae8 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -206,7 +206,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons process_live_dispatcher.connect (block_processor); unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { - block_processor.add (info.block, nano::block_processor::block_source::unchecked); + block_processor.add (info.block, nano::block_source::unchecked); }); vote_cache.rep_weight_query = [this] (nano::account const & rep) { @@ -572,12 +572,12 @@ nano::process_return nano::node::process (nano::block & block) std::optional nano::node::process_local (std::shared_ptr const & block_a) { - return block_processor.add_blocking (block_a, nano::block_processor::block_source::local); + return block_processor.add_blocking (block_a, nano::block_source::local); } void nano::node::process_local_async (std::shared_ptr const & block_a) { - block_processor.add (block_a, nano::block_processor::block_source::local); + block_processor.add (block_a, nano::block_source::local); } void nano::node::start () From aa1ef3144b1b4ef1bd421da4a992e58cbf87a0a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 8 Feb 2024 18:51:55 +0100 Subject: [PATCH 08/13] Add `bootstrap_legacy` source --- nano/lib/stats_enums.hpp | 1 + nano/node/blockprocessor.hpp | 1 + nano/node/bootstrap/bootstrap_attempt.cpp | 2 +- nano/node/bootstrap/bootstrap_lazy.cpp | 2 +- nano/node/bootstrap_ascending/service.cpp | 2 +- 5 files changed, 5 insertions(+), 3 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index e72e3607..9a726472 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -115,6 +115,7 @@ enum class detail : uint8_t // blockprocessor result live, bootstrap, + bootstrap_legacy, unchecked, local, forced, diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 31542b44..bf3c59ef 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -25,6 +25,7 @@ enum class block_source unknown = 0, live, bootstrap, + bootstrap_legacy, unchecked, local, forced, diff --git a/nano/node/bootstrap/bootstrap_attempt.cpp b/nano/node/bootstrap/bootstrap_attempt.cpp index 8d892db2..3f78beff 100644 --- a/nano/node/bootstrap/bootstrap_attempt.cpp +++ b/nano/node/bootstrap/bootstrap_attempt.cpp @@ -133,7 +133,7 @@ bool nano::bootstrap_attempt::process_block (std::shared_ptr const } else { - node_l->block_processor.add (block_a); + node_l->block_processor.add (block_a, nano::block_source::bootstrap_legacy); } return stop_pull; } diff --git a/nano/node/bootstrap/bootstrap_lazy.cpp b/nano/node/bootstrap/bootstrap_lazy.cpp index 0d3dbc94..d69916a9 100644 --- a/nano/node/bootstrap/bootstrap_lazy.cpp +++ b/nano/node/bootstrap/bootstrap_lazy.cpp @@ -311,7 +311,7 @@ bool nano::bootstrap_attempt_lazy::process_block_lazy (std::shared_ptrblock_processor.add (block_a); + node->block_processor.add (block_a, nano::block_source::bootstrap_legacy); } // Force drop lazy bootstrap connection for long bulk_pull if (pull_blocks_processed > max_blocks) diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index 23cd4991..d091b644 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -387,7 +387,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc for (auto & block : response.blocks) { - block_processor.add (block); + block_processor.add (block, nano::block_source::bootstrap); } nano::lock_guard lock{ mutex }; throttle.add (true); From 124421bfefe8add44058caf628b8046a699b9dfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 8 Feb 2024 19:12:15 +0100 Subject: [PATCH 09/13] Name conflict --- nano/node/active_transactions.cpp | 2 +- nano/node/block_broadcast.cpp | 2 +- nano/node/blockprocessor.cpp | 2 +- nano/node/blockprocessor.hpp | 4 ++-- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index b32a3ace..74f1ba4e 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -34,7 +34,7 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi }); // Notify elections about alternative (forked) blocks - block_processor.processed.add ([this] (auto const & result, auto const & block, auto const & context) { + block_processor.block_processed.add ([this] (auto const & result, auto const & block, auto const & context) { switch (result.code) { case nano::process_result::fork: diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index e3f2fc56..2fb61e6a 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -14,7 +14,7 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) { return; } - block_processor.processed.add ([this] (auto const & result, auto const & block, auto const & context) { + block_processor.block_processed.add ([this] (auto const & result, auto const & block, auto const & context) { switch (result.code) { case nano::process_result::progress: diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 0e33234f..3130da49 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -41,7 +41,7 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas // For every batch item: notify the 'processed' observer. for (auto const & [result, block, context] : items) { - processed.notify (result, block, context); + block_processed.notify (result, block, context); } }); processing_thread = std::thread ([this] () { diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index bf3c59ef..97dbb71e 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -87,8 +87,8 @@ public: // Events using processed_t = std::tuple, context>; using processed_batch_t = std::deque; - // The batch observer feeds the processed obsever - nano::observer_set const &, context const &> processed; + // The batch observer feeds the processed observer + nano::observer_set const &, context const &> block_processed; nano::observer_set batch_processed; private: From 9a80f294dc7761a1c547d38e09e4a5348f70b363 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 9 Feb 2024 15:19:57 +0100 Subject: [PATCH 10/13] Stats & logs --- nano/core_test/node.cpp | 2 +- nano/lib/stats_enums.hpp | 10 +++++++-- nano/node/blockprocessor.cpp | 41 ++++++++++++++++++++++++++---------- nano/node/blockprocessor.hpp | 1 + 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 0f7410e7..34f1de2b 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2911,7 +2911,7 @@ TEST (node, block_processor_reject_state) send1->signature.bytes[0] ^= 1; ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ())); node.process_active (send1); - ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor, nano::stat::detail::bad_signature)); + ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor_result, nano::stat::detail::bad_signature)); ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ())); auto send2 = builder.make_block () .account (nano::dev::genesis_key.pub) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 9a726472..63c90bdd 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -36,7 +36,8 @@ enum class type : uint8_t vote_cache, hinting, blockprocessor, - blockprocessor_sources, + blockprocessor_source, + blockprocessor_result, bootstrap_server, active, active_started, @@ -112,7 +113,12 @@ enum class detail : uint8_t representative_mismatch, block_position, - // blockprocessor result + // blockprocessor + process_blocking, + process_blocking_timeout, + force, + + // block source live, bootstrap, bootstrap_legacy, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 3130da49..8d762378 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -88,14 +88,22 @@ void nano::block_processor::add (std::shared_ptr const & block, blo node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); return; } + + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); + node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source)); + add_impl (block, context{ source }); } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking); + node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source)); + context ctx{ source }; auto future = ctx.get_future (); add_impl (block, std::move (ctx)); + try { auto status = future.wait_for (node.config.block_process_timeout); @@ -107,10 +115,25 @@ std::optional nano::block_processor::add_blocking (std::sh } catch (std::future_error const &) { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout); + node.logger.error (nano::log::type::blockprocessor, "Timeout processing block: {}", block->hash ().to_string ()); } + return std::nullopt; } +void nano::block_processor::force (std::shared_ptr const & block_a) +{ + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); + node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); + + { + nano::lock_guard lock{ mutex }; + forced.emplace_back (entry{ block_a, context{ block_source::forced } }); + } + condition.notify_all (); +} + void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block) { auto hash = block.hash (); @@ -143,15 +166,6 @@ void nano::block_processor::rollback_competitor (store::write_transaction const } } -void nano::block_processor::force (std::shared_ptr const & block_a) -{ - { - nano::lock_guard lock{ mutex }; - forced.emplace_back (entry{ block_a, context{ block_source::forced } }); - } - condition.notify_all (); -} - void nano::block_processor::process_blocks () { nano::unique_lock lock{ mutex }; @@ -304,8 +318,8 @@ nano::process_return nano::block_processor::process_one (store::write_transactio auto const hash = block->hash (); nano::process_return result = node.ledger.process (transaction_a, *block); - node.stats.inc (nano::stat::type::blockprocessor, to_stat_detail (result.code)); - node.stats.inc (nano::stat::type::blockprocessor_sources, to_stat_detail (context.source)); + node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result.code)); + node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source)); node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, nano::log::arg{ "result", result.code }, nano::log::arg{ "source", context.source }, @@ -415,6 +429,11 @@ std::unique_ptr nano::collect_container_info (bl return composite; } +std::string_view nano::to_string (nano::block_source source) +{ + return magic_enum::enum_name (source); +} + nano::stat::detail nano::to_stat_detail (nano::block_source type) { auto value = magic_enum::enum_cast (magic_enum::enum_name (type)); diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 97dbb71e..65bb62e7 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -31,6 +31,7 @@ enum class block_source forced, }; +std::string_view to_string (nano::block_source); nano::stat::detail to_stat_detail (block_source); /** From f6a32c98d619dcdc571c2b804c9142b4bb69efe2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 9 Feb 2024 18:52:16 +0100 Subject: [PATCH 11/13] Fixes --- nano/node/block_broadcast.cpp | 2 +- nano/node/blockprocessor.cpp | 18 +++++++++--------- nano/node/blockprocessor.hpp | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index 2fb61e6a..6cdd673c 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -36,7 +36,7 @@ void nano::block_broadcast::observe (std::shared_ptr const & block, } else { - if (context.source != nano::block_source::bootstrap) + if (context.source != nano::block_source::bootstrap && context.source != nano::block_source::bootstrap_legacy) { // Block arrived from realtime traffic, do normal gossip. network.flood_block (block, nano::transport::buffer_drop_policy::limiter); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 8d762378..ddca2e8d 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -222,7 +222,7 @@ bool nano::block_processor::have_blocks () return have_blocks_ready (); } -void nano::block_processor::add_impl (std::shared_ptr block, context ctx) +void nano::block_processor::add_impl (std::shared_ptr const & block, context ctx) { release_assert (ctx.source != nano::block_source::forced); { @@ -237,14 +237,6 @@ auto nano::block_processor::next () -> entry debug_assert (!mutex.try_lock ()); debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next - if (!blocks.empty ()) - { - entry entry = std::move (blocks.front ()); - release_assert (entry.ctx.source != nano::block_source::forced); - blocks.pop_front (); - return entry; - } - if (!forced.empty ()) { entry entry = std::move (forced.front ()); @@ -253,6 +245,14 @@ auto nano::block_processor::next () -> entry return entry; } + if (!blocks.empty ()) + { + entry entry = std::move (blocks.front ()); + release_assert (entry.ctx.source != nano::block_source::forced); + blocks.pop_front (); + return entry; + } + release_assert (false, "next() called when no blocks are ready"); } diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 65bb62e7..4d9c2227 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -31,7 +31,7 @@ enum class block_source forced, }; -std::string_view to_string (nano::block_source); +std::string_view to_string (block_source); nano::stat::detail to_stat_detail (block_source); /** @@ -99,7 +99,7 @@ private: void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); entry next (); - void add_impl (std::shared_ptr block, context); + void add_impl (std::shared_ptr const & block, context); private: // Dependencies nano::node & node; From 11552cb16f00be09b16fcaa3f9be3a574e817517 Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Tue, 13 Feb 2024 17:23:24 +0000 Subject: [PATCH 12/13] Move block in to the block_processor::context object. --- nano/node/active_transactions.cpp | 4 +- nano/node/block_broadcast.cpp | 7 ++-- nano/node/block_broadcast.hpp | 2 +- nano/node/blockprocessor.cpp | 46 +++++++++++------------ nano/node/blockprocessor.hpp | 24 +++++------- nano/node/bootstrap_ascending/service.cpp | 6 +-- nano/node/process_live_dispatcher.cpp | 6 +-- 7 files changed, 45 insertions(+), 50 deletions(-) diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 74f1ba4e..ed4fb36f 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -34,11 +34,11 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi }); // Notify elections about alternative (forked) blocks - block_processor.block_processed.add ([this] (auto const & result, auto const & block, auto const & context) { + block_processor.block_processed.add ([this] (auto const & result, auto const & context) { switch (result.code) { case nano::process_result::fork: - publish (block); + publish (context.block); break; default: break; diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index 6cdd673c..31cde295 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -14,11 +14,11 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) { return; } - block_processor.block_processed.add ([this] (auto const & result, auto const & block, auto const & context) { + block_processor.block_processed.add ([this] (auto const & result, auto const & context) { switch (result.code) { case nano::process_result::progress: - observe (block, context); + observe (context); break; default: break; @@ -26,8 +26,9 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) }); } -void nano::block_broadcast::observe (std::shared_ptr const & block, nano::block_processor::context const & context) +void nano::block_broadcast::observe (nano::block_processor::context const & context) { + auto const & block = context.block; if (context.source == nano::block_source::local) { // Block created on this node diff --git a/nano/node/block_broadcast.hpp b/nano/node/block_broadcast.hpp index 4129ddc4..0f0ee62f 100644 --- a/nano/node/block_broadcast.hpp +++ b/nano/node/block_broadcast.hpp @@ -20,7 +20,7 @@ public: private: // Block_processor observer - void observe (std::shared_ptr const & block, nano::block_processor::context const &); + void observe (nano::block_processor::context const &); nano::network & network; bool enabled; diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index ddca2e8d..dc8c68eb 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -12,7 +12,8 @@ * block_processor::context */ -nano::block_processor::context::context (nano::block_source source_a) : +nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a) : + block{ block }, source{ source_a } { debug_assert (source != nano::block_source::unknown); @@ -39,9 +40,9 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas { batch_processed.add ([this] (auto const & items) { // For every batch item: notify the 'processed' observer. - for (auto const & [result, block, context] : items) + for (auto const & [result, context] : items) { - block_processed.notify (result, block, context); + block_processed.notify (result, context); } }); processing_thread = std::thread ([this] () { @@ -92,7 +93,7 @@ void nano::block_processor::add (std::shared_ptr const & block, blo node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source)); - add_impl (block, context{ source }); + add_impl (context{ block, source }); } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) @@ -100,9 +101,9 @@ std::optional nano::block_processor::add_blocking (std::sh node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking); node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source)); - context ctx{ source }; + context ctx{ block, source }; auto future = ctx.get_future (); - add_impl (block, std::move (ctx)); + add_impl (std::move (ctx)); try { @@ -129,7 +130,7 @@ void nano::block_processor::force (std::shared_ptr const & block_a) { nano::lock_guard lock{ mutex }; - forced.emplace_back (entry{ block_a, context{ block_source::forced } }); + forced.emplace_back (context{ block_a, block_source::forced }); } condition.notify_all (); } @@ -180,7 +181,7 @@ void nano::block_processor::process_blocks () debug_assert (!lock.owns_lock ()); // Set results for futures when not holding the lock - for (auto & [result, block, context] : processed) + for (auto & [result, context] : processed) { context.set_result (result); } @@ -222,33 +223,33 @@ bool nano::block_processor::have_blocks () return have_blocks_ready (); } -void nano::block_processor::add_impl (std::shared_ptr const & block, context ctx) +void nano::block_processor::add_impl (context ctx) { release_assert (ctx.source != nano::block_source::forced); { nano::lock_guard guard{ mutex }; - blocks.emplace_back (entry{ block, std::move (ctx) }); + blocks.emplace_back (std::move (ctx)); } condition.notify_all (); } -auto nano::block_processor::next () -> entry +auto nano::block_processor::next () -> context { debug_assert (!mutex.try_lock ()); debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next if (!forced.empty ()) { - entry entry = std::move (forced.front ()); - release_assert (entry.ctx.source == nano::block_source::forced); + auto entry = std::move (forced.front ()); + release_assert (entry.source == nano::block_source::forced); forced.pop_front (); return entry; } if (!blocks.empty ()) { - entry entry = std::move (blocks.front ()); - release_assert (entry.ctx.source != nano::block_source::forced); + auto entry = std::move (blocks.front ()); + release_assert (entry.source != nano::block_source::forced); blocks.pop_front (); return entry; } @@ -281,10 +282,8 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ()); } - entry entry = next (); - context ctx = std::move (entry.ctx); - auto const block = entry.block; - auto const hash = block->hash (); + auto ctx = next (); + auto const hash = ctx.block->hash (); bool const force = ctx.source == nano::block_source::forced; lock_a.unlock (); @@ -292,13 +291,13 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock if (force) { number_of_forced_processed++; - rollback_competitor (transaction, *block); + rollback_competitor (transaction, *ctx.block); } number_of_blocks_processed++; - auto result = process_one (transaction, block, ctx, force); - processed.emplace_back (result, block, std::move (ctx)); + auto result = process_one (transaction, ctx, force); + processed.emplace_back (result, std::move (ctx)); lock_a.lock (); } @@ -313,8 +312,9 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock return processed; } -nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, std::shared_ptr block, context const & context, bool const forced_a) +nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, context const & context, bool const forced_a) { + auto const & block = context.block; auto const hash = block->hash (); nano::process_return result = node.ledger.process (transaction_a, *block); diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 4d9c2227..bae8fbcb 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -44,8 +44,9 @@ public: // Context class context { public: - explicit context (block_source); + context (std::shared_ptr block, block_source source); + std::shared_ptr block; block_source const source; std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () }; @@ -60,13 +61,6 @@ public: // Context friend class block_processor; }; -private: - struct entry - { - std::shared_ptr block; - block_processor::context ctx; - }; - public: block_processor (nano::node &, nano::write_database_queue &); @@ -85,21 +79,21 @@ public: std::atomic flushing{ false }; public: // Events - using processed_t = std::tuple, context>; + using processed_t = std::tuple; using processed_batch_t = std::deque; // The batch observer feeds the processed observer - nano::observer_set const &, context const &> block_processed; + nano::observer_set block_processed; nano::observer_set batch_processed; private: // Roll back block in the ledger that conflicts with 'block' void rollback_competitor (store::write_transaction const &, nano::block const & block); - nano::process_return process_one (store::write_transaction const &, std::shared_ptr block, context const &, bool forced = false); + nano::process_return process_one (store::write_transaction const &, context const &, bool forced = false); void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); - entry next (); - void add_impl (std::shared_ptr const & block, context); + context next (); + void add_impl (context); private: // Dependencies nano::node & node; @@ -109,8 +103,8 @@ private: bool stopped{ false }; bool active{ false }; - std::deque blocks; - std::deque forced; + std::deque blocks; + std::deque forced; std::chrono::steady_clock::time_point next_log; nano::condition_variable condition; diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index d091b644..14430c09 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -34,10 +34,10 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano: nano::lock_guard lock{ mutex }; auto transaction = ledger.store.tx_begin_read (); - for (auto const & [result, block, context] : batch) + for (auto const & [result, context] : batch) { - debug_assert (block != nullptr); - inspect (transaction, result, *block); + debug_assert (context.block != nullptr); + inspect (transaction, result, *context.block); } } diff --git a/nano/node/process_live_dispatcher.cpp b/nano/node/process_live_dispatcher.cpp index daaad17f..9dfe769c 100644 --- a/nano/node/process_live_dispatcher.cpp +++ b/nano/node/process_live_dispatcher.cpp @@ -20,10 +20,10 @@ void nano::process_live_dispatcher::connect (nano::block_processor & block_proce { block_processor.batch_processed.add ([this] (auto const & batch) { auto const transaction = ledger.store.tx_begin_read (); - for (auto const & [result, block, context] : batch) + for (auto const & [result, context] : batch) { - debug_assert (block != nullptr); - inspect (result, *block, transaction); + debug_assert (context.block != nullptr); + inspect (result, *context.block, transaction); } }); } From 5927a951dd5db02110ab6836f8179d326c0e9ecf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 13 Feb 2024 19:56:46 +0100 Subject: [PATCH 13/13] Constants --- nano/node/blockprocessor.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index bae8fbcb..5130942c 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -44,9 +44,9 @@ public: // Context class context { public: - context (std::shared_ptr block, block_source source); + context (std::shared_ptr block, block_source source); - std::shared_ptr block; + std::shared_ptr const block; block_source const source; std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () };