From 11552cb16f00be09b16fcaa3f9be3a574e817517 Mon Sep 17 00:00:00 2001 From: Colin LeMahieu Date: Tue, 13 Feb 2024 17:23:24 +0000 Subject: [PATCH] Move block in to the block_processor::context object. --- nano/node/active_transactions.cpp | 4 +- nano/node/block_broadcast.cpp | 7 ++-- nano/node/block_broadcast.hpp | 2 +- nano/node/blockprocessor.cpp | 46 +++++++++++------------ nano/node/blockprocessor.hpp | 24 +++++------- nano/node/bootstrap_ascending/service.cpp | 6 +-- nano/node/process_live_dispatcher.cpp | 6 +-- 7 files changed, 45 insertions(+), 50 deletions(-) diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 74f1ba4e..ed4fb36f 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -34,11 +34,11 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi }); // Notify elections about alternative (forked) blocks - block_processor.block_processed.add ([this] (auto const & result, auto const & block, auto const & context) { + block_processor.block_processed.add ([this] (auto const & result, auto const & context) { switch (result.code) { case nano::process_result::fork: - publish (block); + publish (context.block); break; default: break; diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index 6cdd673c..31cde295 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -14,11 +14,11 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) { return; } - block_processor.block_processed.add ([this] (auto const & result, auto const & block, auto const & context) { + block_processor.block_processed.add ([this] (auto const & result, auto const & context) { switch (result.code) { case nano::process_result::progress: - observe (block, context); + observe (context); break; default: break; @@ -26,8 +26,9 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) }); } -void nano::block_broadcast::observe (std::shared_ptr const & block, nano::block_processor::context const & context) +void nano::block_broadcast::observe (nano::block_processor::context const & context) { + auto const & block = context.block; if (context.source == nano::block_source::local) { // Block created on this node diff --git a/nano/node/block_broadcast.hpp b/nano/node/block_broadcast.hpp index 4129ddc4..0f0ee62f 100644 --- a/nano/node/block_broadcast.hpp +++ b/nano/node/block_broadcast.hpp @@ -20,7 +20,7 @@ public: private: // Block_processor observer - void observe (std::shared_ptr const & block, nano::block_processor::context const &); + void observe (nano::block_processor::context const &); nano::network & network; bool enabled; diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index ddca2e8d..dc8c68eb 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -12,7 +12,8 @@ * block_processor::context */ -nano::block_processor::context::context (nano::block_source source_a) : +nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a) : + block{ block }, source{ source_a } { debug_assert (source != nano::block_source::unknown); @@ -39,9 +40,9 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas { batch_processed.add ([this] (auto const & items) { // For every batch item: notify the 'processed' observer. - for (auto const & [result, block, context] : items) + for (auto const & [result, context] : items) { - block_processed.notify (result, block, context); + block_processed.notify (result, context); } }); processing_thread = std::thread ([this] () { @@ -92,7 +93,7 @@ void nano::block_processor::add (std::shared_ptr const & block, blo node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source)); - add_impl (block, context{ source }); + add_impl (context{ block, source }); } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) @@ -100,9 +101,9 @@ std::optional nano::block_processor::add_blocking (std::sh node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking); node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source)); - context ctx{ source }; + context ctx{ block, source }; auto future = ctx.get_future (); - add_impl (block, std::move (ctx)); + add_impl (std::move (ctx)); try { @@ -129,7 +130,7 @@ void nano::block_processor::force (std::shared_ptr const & block_a) { nano::lock_guard lock{ mutex }; - forced.emplace_back (entry{ block_a, context{ block_source::forced } }); + forced.emplace_back (context{ block_a, block_source::forced }); } condition.notify_all (); } @@ -180,7 +181,7 @@ void nano::block_processor::process_blocks () debug_assert (!lock.owns_lock ()); // Set results for futures when not holding the lock - for (auto & [result, block, context] : processed) + for (auto & [result, context] : processed) { context.set_result (result); } @@ -222,33 +223,33 @@ bool nano::block_processor::have_blocks () return have_blocks_ready (); } -void nano::block_processor::add_impl (std::shared_ptr const & block, context ctx) +void nano::block_processor::add_impl (context ctx) { release_assert (ctx.source != nano::block_source::forced); { nano::lock_guard guard{ mutex }; - blocks.emplace_back (entry{ block, std::move (ctx) }); + blocks.emplace_back (std::move (ctx)); } condition.notify_all (); } -auto nano::block_processor::next () -> entry +auto nano::block_processor::next () -> context { debug_assert (!mutex.try_lock ()); debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next if (!forced.empty ()) { - entry entry = std::move (forced.front ()); - release_assert (entry.ctx.source == nano::block_source::forced); + auto entry = std::move (forced.front ()); + release_assert (entry.source == nano::block_source::forced); forced.pop_front (); return entry; } if (!blocks.empty ()) { - entry entry = std::move (blocks.front ()); - release_assert (entry.ctx.source != nano::block_source::forced); + auto entry = std::move (blocks.front ()); + release_assert (entry.source != nano::block_source::forced); blocks.pop_front (); return entry; } @@ -281,10 +282,8 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ()); } - entry entry = next (); - context ctx = std::move (entry.ctx); - auto const block = entry.block; - auto const hash = block->hash (); + auto ctx = next (); + auto const hash = ctx.block->hash (); bool const force = ctx.source == nano::block_source::forced; lock_a.unlock (); @@ -292,13 +291,13 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock if (force) { number_of_forced_processed++; - rollback_competitor (transaction, *block); + rollback_competitor (transaction, *ctx.block); } number_of_blocks_processed++; - auto result = process_one (transaction, block, ctx, force); - processed.emplace_back (result, block, std::move (ctx)); + auto result = process_one (transaction, ctx, force); + processed.emplace_back (result, std::move (ctx)); lock_a.lock (); } @@ -313,8 +312,9 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock return processed; } -nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, std::shared_ptr block, context const & context, bool const forced_a) +nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, context const & context, bool const forced_a) { + auto const & block = context.block; auto const hash = block->hash (); nano::process_return result = node.ledger.process (transaction_a, *block); diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 4d9c2227..bae8fbcb 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -44,8 +44,9 @@ public: // Context class context { public: - explicit context (block_source); + context (std::shared_ptr block, block_source source); + std::shared_ptr block; block_source const source; std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () }; @@ -60,13 +61,6 @@ public: // Context friend class block_processor; }; -private: - struct entry - { - std::shared_ptr block; - block_processor::context ctx; - }; - public: block_processor (nano::node &, nano::write_database_queue &); @@ -85,21 +79,21 @@ public: std::atomic flushing{ false }; public: // Events - using processed_t = std::tuple, context>; + using processed_t = std::tuple; using processed_batch_t = std::deque; // The batch observer feeds the processed observer - nano::observer_set const &, context const &> block_processed; + nano::observer_set block_processed; nano::observer_set batch_processed; private: // Roll back block in the ledger that conflicts with 'block' void rollback_competitor (store::write_transaction const &, nano::block const & block); - nano::process_return process_one (store::write_transaction const &, std::shared_ptr block, context const &, bool forced = false); + nano::process_return process_one (store::write_transaction const &, context const &, bool forced = false); void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); - entry next (); - void add_impl (std::shared_ptr const & block, context); + context next (); + void add_impl (context); private: // Dependencies nano::node & node; @@ -109,8 +103,8 @@ private: bool stopped{ false }; bool active{ false }; - std::deque blocks; - std::deque forced; + std::deque blocks; + std::deque forced; std::chrono::steady_clock::time_point next_log; nano::condition_variable condition; diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index d091b644..14430c09 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -34,10 +34,10 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano: nano::lock_guard lock{ mutex }; auto transaction = ledger.store.tx_begin_read (); - for (auto const & [result, block, context] : batch) + for (auto const & [result, context] : batch) { - debug_assert (block != nullptr); - inspect (transaction, result, *block); + debug_assert (context.block != nullptr); + inspect (transaction, result, *context.block); } } diff --git a/nano/node/process_live_dispatcher.cpp b/nano/node/process_live_dispatcher.cpp index daaad17f..9dfe769c 100644 --- a/nano/node/process_live_dispatcher.cpp +++ b/nano/node/process_live_dispatcher.cpp @@ -20,10 +20,10 @@ void nano::process_live_dispatcher::connect (nano::block_processor & block_proce { block_processor.batch_processed.add ([this] (auto const & batch) { auto const transaction = ledger.store.tx_begin_read (); - for (auto const & [result, block, context] : batch) + for (auto const & [result, context] : batch) { - debug_assert (block != nullptr); - inspect (result, *block, transaction); + debug_assert (context.block != nullptr); + inspect (result, *context.block, transaction); } }); }