diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 910a518c..cf867687 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -190,6 +190,12 @@ TEST (active_transactions, keep_local) auto const send4 = wallet.send_action (nano::dev::genesis_key.pub, key4.pub, node.config.receive_minimum.number ()); auto const send5 = wallet.send_action (nano::dev::genesis_key.pub, key5.pub, node.config.receive_minimum.number ()); auto const send6 = wallet.send_action (nano::dev::genesis_key.pub, key6.pub, node.config.receive_minimum.number ()); + ASSERT_NE (nullptr, send1); + ASSERT_NE (nullptr, send2); + ASSERT_NE (nullptr, send3); + ASSERT_NE (nullptr, send4); + ASSERT_NE (nullptr, send5); + ASSERT_NE (nullptr, send6); // force-confirm blocks for (auto const & block : { send1, send2, send3, send4, send5, send6 }) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index fddf1683..34f1de2b 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2111,53 +2111,6 @@ TEST (node, block_confirm) ASSERT_TIMELY_EQ (10s, node1.active.recently_cemented.list ().size (), 1); } -TEST (node, block_arrival) -{ - nano::test::system system (1); - auto & node (*system.nodes[0]); - ASSERT_EQ (0, node.block_arrival.arrival.size ()); - nano::block_hash hash1 (1); - node.block_arrival.add (hash1); - ASSERT_EQ (1, node.block_arrival.arrival.size ()); - node.block_arrival.add (hash1); - ASSERT_EQ (1, node.block_arrival.arrival.size ()); - nano::block_hash hash2 (2); - node.block_arrival.add (hash2); - ASSERT_EQ (2, node.block_arrival.arrival.size ()); -} - -TEST (node, block_arrival_size) -{ - nano::test::system system (1); - auto & node (*system.nodes[0]); - auto time (std::chrono::steady_clock::now () - nano::block_arrival::arrival_time_min - std::chrono::seconds (5)); - nano::block_hash hash (0); - for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i) - { - node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash }); - ++hash.qwords[0]; - } - ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ()); - node.block_arrival.recent (0); - ASSERT_EQ (nano::block_arrival::arrival_size_min, node.block_arrival.arrival.size ()); -} - -TEST (node, block_arrival_time) -{ - nano::test::system system (1); - auto & node (*system.nodes[0]); - auto time (std::chrono::steady_clock::now ()); - nano::block_hash hash (0); - for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i) - { - node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash }); - ++hash.qwords[0]; - } - ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ()); - node.block_arrival.recent (0); - ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ()); -} - TEST (node, confirm_quorum) { nano::test::system system (1); @@ -2958,7 +2911,7 @@ TEST (node, block_processor_reject_state) send1->signature.bytes[0] ^= 1; ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ())); node.process_active (send1); - ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor, nano::stat::detail::bad_signature)); + ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor_result, nano::stat::detail::bad_signature)); ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ())); auto send2 = builder.make_block () .account (nano::dev::genesis_key.pub) diff --git a/nano/lib/locks.cpp b/nano/lib/locks.cpp index 3217e5e5..231ef13c 100644 --- a/nano/lib/locks.cpp +++ b/nano/lib/locks.cpp @@ -251,8 +251,6 @@ char const * nano::mutex_identifier (mutexes mutex) { case mutexes::active: return "active"; - case mutexes::block_arrival: - return "block_arrival"; case mutexes::block_processor: return "block_processor"; case mutexes::block_uniquer: diff --git a/nano/lib/locks.hpp b/nano/lib/locks.hpp index f8447edf..1bbe0a7f 100644 --- a/nano/lib/locks.hpp +++ b/nano/lib/locks.hpp @@ -20,7 +20,6 @@ bool any_filters_registered (); enum class mutexes { active, - block_arrival, block_processor, block_uniquer, blockstore_cache, diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 8fc31557..63c90bdd 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -36,6 +36,8 @@ enum class type : uint8_t vote_cache, hinting, blockprocessor, + blockprocessor_source, + blockprocessor_result, bootstrap_server, active, active_started, @@ -71,6 +73,7 @@ enum class detail : uint8_t top, none, success, + unknown, // processing queue queue, @@ -110,6 +113,19 @@ enum class detail : uint8_t representative_mismatch, block_position, + // blockprocessor + process_blocking, + process_blocking_timeout, + force, + + // block source + live, + bootstrap, + bootstrap_legacy, + unchecked, + local, + forced, + // message specific not_a_type, invalid, diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 37278a50..4452df57 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -20,12 +20,8 @@ add_library( backlog_population.cpp bandwidth_limiter.hpp bandwidth_limiter.cpp - block_arrival.hpp - block_arrival.cpp block_broadcast.cpp block_broadcast.hpp - blocking_observer.cpp - blocking_observer.hpp blockprocessor.hpp blockprocessor.cpp bootstrap/block_deserializer.hpp diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 01c75352..ed4fb36f 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -34,11 +34,11 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi }); // Notify elections about alternative (forked) blocks - block_processor.processed.add ([this] (auto const & result, auto const & block) { + block_processor.block_processed.add ([this] (auto const & result, auto const & context) { switch (result.code) { case nano::process_result::fork: - publish (block); + publish (context.block); break; default: break; diff --git a/nano/node/block_arrival.cpp b/nano/node/block_arrival.cpp deleted file mode 100644 index 914e18dc..00000000 --- a/nano/node/block_arrival.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include - -bool nano::block_arrival::add (nano::block_hash const & hash_a) -{ - nano::lock_guard lock{ mutex }; - auto now (std::chrono::steady_clock::now ()); - auto inserted (arrival.get ().emplace_back (nano::block_arrival_info{ now, hash_a })); - auto result (!inserted.second); - return result; -} - -bool nano::block_arrival::recent (nano::block_hash const & hash_a) -{ - nano::lock_guard lock{ mutex }; - auto now (std::chrono::steady_clock::now ()); - while (arrival.size () > arrival_size_min && arrival.get ().front ().arrival + arrival_time_min < now) - { - arrival.get ().pop_front (); - } - return arrival.get ().find (hash_a) != arrival.get ().end (); -} - -std::unique_ptr nano::collect_container_info (block_arrival & block_arrival, std::string const & name) -{ - std::size_t count = 0; - { - nano::lock_guard guard{ block_arrival.mutex }; - count = block_arrival.arrival.size (); - } - - auto sizeof_element = sizeof (decltype (block_arrival.arrival)::value_type); - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "arrival", count, sizeof_element })); - return composite; -} \ No newline at end of file diff --git a/nano/node/block_arrival.hpp b/nano/node/block_arrival.hpp deleted file mode 100644 index 71aa3946..00000000 --- a/nano/node/block_arrival.hpp +++ /dev/null @@ -1,49 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -#include - -namespace nano -{ -class block_arrival_info final -{ -public: - std::chrono::steady_clock::time_point arrival; - nano::block_hash hash; -}; - -// This class tracks blocks that are probably live because they arrived in a UDP packet -// This gives a fairly reliable way to differentiate between blocks being inserted via bootstrap or new, live blocks. -class block_arrival final -{ -public: - // Return `true' to indicated an error if the block has already been inserted - bool add (nano::block_hash const &); - bool recent (nano::block_hash const &); - - // clang-format off - class tag_sequence {}; - class tag_hash {}; - - boost::multi_index_container>, - boost::multi_index::hashed_unique, - boost::multi_index::member>>> - arrival; - // clang-format on - - nano::mutex mutex{ mutex_identifier (mutexes::block_arrival) }; - - static std::size_t constexpr arrival_size_min = 8 * 1024; - static std::chrono::seconds constexpr arrival_time_min = std::chrono::seconds (300); -}; - -std::unique_ptr collect_container_info (block_arrival & block_arrival, std::string const & name); -} \ No newline at end of file diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp index fc996fc0..31cde295 100644 --- a/nano/node/block_broadcast.cpp +++ b/nano/node/block_broadcast.cpp @@ -1,11 +1,9 @@ -#include #include #include #include -nano::block_broadcast::block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled) : +nano::block_broadcast::block_broadcast (nano::network & network, bool enabled) : network{ network }, - block_arrival{ block_arrival }, enabled{ enabled } { } @@ -16,26 +14,22 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor) { return; } - block_processor.processed.add ([this] (auto const & result, auto const & block) { + block_processor.block_processed.add ([this] (auto const & result, auto const & context) { switch (result.code) { case nano::process_result::progress: - observe (block); + observe (context); break; default: break; } - erase (block); }); } -void nano::block_broadcast::observe (std::shared_ptr block) +void nano::block_broadcast::observe (nano::block_processor::context const & context) { - nano::unique_lock lock{ mutex }; - auto existing = local.find (block); - auto local_l = existing != local.end (); - lock.unlock (); - if (local_l) + auto const & block = context.block; + if (context.source == nano::block_source::local) { // Block created on this node // Perform more agressive initial flooding @@ -43,7 +37,7 @@ void nano::block_broadcast::observe (std::shared_ptr block) } else { - if (block_arrival.recent (block->hash ())) + if (context.source != nano::block_source::bootstrap && context.source != nano::block_source::bootstrap_legacy) { // Block arrived from realtime traffic, do normal gossip. network.flood_block (block, nano::transport::buffer_drop_policy::limiter); @@ -55,23 +49,3 @@ void nano::block_broadcast::observe (std::shared_ptr block) } } } - -void nano::block_broadcast::set_local (std::shared_ptr block) -{ - if (!enabled) - { - return; - } - nano::lock_guard lock{ mutex }; - local.insert (block); -} - -void nano::block_broadcast::erase (std::shared_ptr block) -{ - if (!enabled) - { - return; - } - nano::lock_guard lock{ mutex }; - local.erase (block); -} diff --git a/nano/node/block_broadcast.hpp b/nano/node/block_broadcast.hpp index 081d3f2f..0f0ee62f 100644 --- a/nano/node/block_broadcast.hpp +++ b/nano/node/block_broadcast.hpp @@ -1,34 +1,28 @@ #pragma once #include +#include #include #include namespace nano { -class block_arrival; -class block_processor; class network; + // This class tracks blocks that originated from this node. class block_broadcast { public: - block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled = false); + block_broadcast (nano::network & network, bool enabled = false); // Add batch_processed observer to block_processor if enabled void connect (nano::block_processor & block_processor); - // Mark a block as originating locally - void set_local (std::shared_ptr block); - void erase (std::shared_ptr block); private: // Block_processor observer - void observe (std::shared_ptr block); + void observe (nano::block_processor::context const &); nano::network & network; - nano::block_arrival & block_arrival; - std::unordered_set> local; // Blocks originated on this node - nano::mutex mutex; bool enabled; }; } diff --git a/nano/node/blocking_observer.cpp b/nano/node/blocking_observer.cpp deleted file mode 100644 index 339b09b8..00000000 --- a/nano/node/blocking_observer.cpp +++ /dev/null @@ -1,62 +0,0 @@ -#include -#include - -void nano::blocking_observer::connect (nano::block_processor & block_processor) -{ - block_processor.processed.add ([this] (auto const & result, auto const & block) { - observe (result, block); - }); -} - -void nano::blocking_observer::stop () -{ - nano::unique_lock lock{ mutex }; - stopped = true; - auto discard = std::move (blocking); - // Signal broken promises outside lock - lock.unlock (); - discard.clear (); // ~promise future_error -} - -void nano::blocking_observer::observe (nano::process_return const & result, std::shared_ptr block) -{ - nano::unique_lock lock{ mutex }; - auto existing = blocking.find (block); - if (existing != blocking.end ()) - { - auto promise = std::move (existing->second); - blocking.erase (existing); - // Signal promise outside of lock - lock.unlock (); - promise.set_value (result); - } -} - -std::future nano::blocking_observer::insert (std::shared_ptr block) -{ - nano::lock_guard lock{ mutex }; - if (stopped) - { - std::promise promise; - return promise.get_future (); // ~promise future_error - } - auto iterator = blocking.emplace (block, std::promise{}); - return iterator->second.get_future (); -} - -bool nano::blocking_observer::exists (std::shared_ptr block) -{ - nano::lock_guard lock{ mutex }; - auto existing = blocking.find (block); - return existing != blocking.end (); -} - -void nano::blocking_observer::erase (std::shared_ptr block) -{ - nano::lock_guard lock{ mutex }; - auto existing = blocking.find (block); - if (existing != blocking.end ()) - { - blocking.erase (existing); - } -} \ No newline at end of file diff --git a/nano/node/blocking_observer.hpp b/nano/node/blocking_observer.hpp deleted file mode 100644 index bd971412..00000000 --- a/nano/node/blocking_observer.hpp +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include -#include - -#include -#include -#include - -namespace nano -{ -class block; -class block_processor; -// Observer that facilitates a blocking call to block processing which is done asynchronosly by the block_processor -class blocking_observer -{ -public: - void connect (nano::block_processor & block_processor); - // Stop the observer and trigger broken promise exceptions - void stop (); - // Block processor observer - void observe (nano::process_return const & result, std::shared_ptr block); - [[nodiscard]] std::future insert (std::shared_ptr block); - bool exists (std::shared_ptr block); - void erase (std::shared_ptr block); - -private: - std::unordered_multimap, std::promise> blocking; - bool stopped{ false }; - nano::mutex mutex; -}; -} diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index c0477b4e..dc8c68eb 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -6,20 +6,45 @@ #include +#include + +/* + * block_processor::context + */ + +nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a) : + block{ block }, + source{ source_a } +{ + debug_assert (source != nano::block_source::unknown); +} + +auto nano::block_processor::context::get_future () -> std::future +{ + return promise.get_future (); +} + +void nano::block_processor::context::set_result (result_t const & result) +{ + promise.set_value (result); +} + +/* + * block_processor + */ + nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) : - next_log (std::chrono::steady_clock::now ()), node (node_a), - write_database_queue (write_database_queue_a) + write_database_queue (write_database_queue_a), + next_log (std::chrono::steady_clock::now ()) { batch_processed.add ([this] (auto const & items) { // For every batch item: notify the 'processed' observer. - for (auto const & item : items) + for (auto const & [result, context] : items) { - auto const & [result, block] = item; - processed.notify (result, block); + block_processed.notify (result, context); } }); - blocking.connect (*this); processing_thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::block_processing); this->process_blocks (); @@ -33,7 +58,6 @@ void nano::block_processor::stop () stopped = true; } condition.notify_all (); - blocking.stop (); nano::join_or_pass (processing_thread); } @@ -53,7 +77,7 @@ bool nano::block_processor::half_full () return size () >= node.flags.block_processor_full_size / 2; } -void nano::block_processor::add (std::shared_ptr const & block) +void nano::block_processor::add (std::shared_ptr const & block, block_source const source) { if (full ()) { @@ -65,33 +89,50 @@ void nano::block_processor::add (std::shared_ptr const & block) node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); return; } - add_impl (block); - return; + + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); + node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source)); + + add_impl (context{ block, source }); } -std::optional nano::block_processor::add_blocking (std::shared_ptr const & block) +std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) { - auto future = blocking.insert (block); - add_impl (block); - condition.notify_all (); - std::optional result; + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking); + node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source)); + + context ctx{ block, source }; + auto future = ctx.get_future (); + add_impl (std::move (ctx)); + try { auto status = future.wait_for (node.config.block_process_timeout); debug_assert (status != std::future_status::deferred); if (status == std::future_status::ready) { - result = future.get (); - } - else - { - blocking.erase (block); + return future.get (); } } catch (std::future_error const &) { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout); + node.logger.error (nano::log::type::blockprocessor, "Timeout processing block: {}", block->hash ().to_string ()); } - return result; + + return std::nullopt; +} + +void nano::block_processor::force (std::shared_ptr const & block_a) +{ + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); + node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); + + { + nano::lock_guard lock{ mutex }; + forced.emplace_back (context{ block_a, block_source::forced }); + } + condition.notify_all (); } void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block) @@ -126,15 +167,6 @@ void nano::block_processor::rollback_competitor (store::write_transaction const } } -void nano::block_processor::force (std::shared_ptr const & block_a) -{ - { - nano::lock_guard lock{ mutex }; - forced.push_back (block_a); - } - condition.notify_all (); -} - void nano::block_processor::process_blocks () { nano::unique_lock lock{ mutex }; @@ -144,8 +176,18 @@ void nano::block_processor::process_blocks () { active = true; lock.unlock (); + auto processed = process_batch (lock); + debug_assert (!lock.owns_lock ()); + + // Set results for futures when not holding the lock + for (auto & [result, context] : processed) + { + context.set_result (result); + } + batch_processed.notify (processed); + lock.lock (); active = false; } @@ -181,28 +223,57 @@ bool nano::block_processor::have_blocks () return have_blocks_ready (); } -void nano::block_processor::add_impl (std::shared_ptr block) +void nano::block_processor::add_impl (context ctx) { + release_assert (ctx.source != nano::block_source::forced); { nano::lock_guard guard{ mutex }; - blocks.emplace_back (block); + blocks.emplace_back (std::move (ctx)); } condition.notify_all (); } -auto nano::block_processor::process_batch (nano::unique_lock & lock_a) -> std::deque +auto nano::block_processor::next () -> context { - std::deque processed; + debug_assert (!mutex.try_lock ()); + debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next + + if (!forced.empty ()) + { + auto entry = std::move (forced.front ()); + release_assert (entry.source == nano::block_source::forced); + forced.pop_front (); + return entry; + } + + if (!blocks.empty ()) + { + auto entry = std::move (blocks.front ()); + release_assert (entry.source != nano::block_source::forced); + blocks.pop_front (); + return entry; + } + + release_assert (false, "next() called when no blocks are ready"); +} + +auto nano::block_processor::process_batch (nano::unique_lock & lock_a) -> processed_batch_t +{ + processed_batch_t processed; + auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending })); nano::timer timer_l; + lock_a.lock (); + timer_l.start (); // Processing blocks unsigned number_of_blocks_processed (0), number_of_forced_processed (0); auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; + while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) { // TODO: Cleaner periodical logging @@ -211,33 +282,26 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ()); } - std::shared_ptr block; - nano::block_hash hash (0); - bool force (false); - if (forced.empty ()) - { - block = blocks.front (); - blocks.pop_front (); - hash = block->hash (); - } - else - { - block = forced.front (); - forced.pop_front (); - hash = block->hash (); - force = true; - number_of_forced_processed++; - } + auto ctx = next (); + auto const hash = ctx.block->hash (); + bool const force = ctx.source == nano::block_source::forced; + lock_a.unlock (); + if (force) { - rollback_competitor (transaction, *block); + number_of_forced_processed++; + rollback_competitor (transaction, *ctx.block); } + number_of_blocks_processed++; - auto result = process_one (transaction, block, force); - processed.emplace_back (result, block); + + auto result = process_one (transaction, ctx, force); + processed.emplace_back (result, std::move (ctx)); + lock_a.lock (); } + lock_a.unlock (); if (number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100)) @@ -248,15 +312,18 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock return processed; } -nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, std::shared_ptr block, bool const forced_a) +nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, context const & context, bool const forced_a) { - nano::process_return result; - auto hash (block->hash ()); - result = node.ledger.process (transaction_a, *block); + auto const & block = context.block; + auto const hash = block->hash (); + nano::process_return result = node.ledger.process (transaction_a, *block); - node.stats.inc (nano::stat::type::blockprocessor, to_stat_detail (result.code)); + node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result.code)); + node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source)); node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, nano::log::arg{ "result", result.code }, + nano::log::arg{ "source", context.source }, + nano::log::arg{ "arrival", nano::log::microseconds (context.arrival) }, nano::log::arg{ "forced", forced_a }, nano::log::arg{ "block", block }); @@ -361,3 +428,15 @@ std::unique_ptr nano::collect_container_info (bl composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); return composite; } + +std::string_view nano::to_string (nano::block_source source) +{ + return magic_enum::enum_name (source); +} + +nano::stat::detail nano::to_stat_detail (nano::block_source type) +{ + auto value = magic_enum::enum_cast (magic_enum::enum_name (type)); + debug_assert (value); + return value.value_or (nano::stat::detail{}); +} diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index a53fa807..5130942c 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -2,12 +2,12 @@ #include #include -#include #include #include #include #include +#include #include namespace nano::store @@ -20,20 +20,56 @@ namespace nano class node; class write_database_queue; +enum class block_source +{ + unknown = 0, + live, + bootstrap, + bootstrap_legacy, + unchecked, + local, + forced, +}; + +std::string_view to_string (block_source); +nano::stat::detail to_stat_detail (block_source); + /** * Processing blocks is a potentially long IO operation. * This class isolates block insertion from other operations like servicing network operations */ class block_processor final { +public: // Context + class context + { + public: + context (std::shared_ptr block, block_source source); + + std::shared_ptr const block; + block_source const source; + std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () }; + + public: + using result_t = nano::process_return; + std::future get_future (); + + private: + void set_result (result_t const &); + std::promise promise; + + friend class block_processor; + }; + public: - explicit block_processor (nano::node &, nano::write_database_queue &); + block_processor (nano::node &, nano::write_database_queue &); + void stop (); std::size_t size (); bool full (); bool half_full (); - void add (std::shared_ptr const &); - std::optional add_blocking (std::shared_ptr const & block); + void add (std::shared_ptr const &, block_source = block_source::live); + std::optional add_blocking (std::shared_ptr const & block, block_source); void force (std::shared_ptr const &); bool should_log (); bool have_blocks_ready (); @@ -43,34 +79,38 @@ public: std::atomic flushing{ false }; public: // Events - using processed_t = std::pair>; - nano::observer_set> processed; + using processed_t = std::tuple; + using processed_batch_t = std::deque; - // The batch observer feeds the processed obsever - nano::observer_set const &> batch_processed; - -private: - blocking_observer blocking; + // The batch observer feeds the processed observer + nano::observer_set block_processed; + nano::observer_set batch_processed; private: // Roll back block in the ledger that conflicts with 'block' - void rollback_competitor (store::write_transaction const & transaction, nano::block const & block); - nano::process_return process_one (store::write_transaction const &, std::shared_ptr block, bool const = false); + void rollback_competitor (store::write_transaction const &, nano::block const & block); + nano::process_return process_one (store::write_transaction const &, context const &, bool forced = false); void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); - std::deque process_batch (nano::unique_lock &); - void add_impl (std::shared_ptr block); - bool stopped{ false }; - bool active{ false }; - std::chrono::steady_clock::time_point next_log; - std::deque> blocks; - std::deque> forced; - nano::condition_variable condition; + processed_batch_t process_batch (nano::unique_lock &); + context next (); + void add_impl (context); + +private: // Dependencies nano::node & node; nano::write_database_queue & write_database_queue; + +private: + bool stopped{ false }; + bool active{ false }; + + std::deque blocks; + std::deque forced; + + std::chrono::steady_clock::time_point next_log; + nano::condition_variable condition; nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread processing_thread; friend std::unique_ptr collect_container_info (block_processor & block_processor, std::string const & name); }; -std::unique_ptr collect_container_info (block_processor & block_processor, std::string const & name); } diff --git a/nano/node/bootstrap/bootstrap_attempt.cpp b/nano/node/bootstrap/bootstrap_attempt.cpp index 8d892db2..3f78beff 100644 --- a/nano/node/bootstrap/bootstrap_attempt.cpp +++ b/nano/node/bootstrap/bootstrap_attempt.cpp @@ -133,7 +133,7 @@ bool nano::bootstrap_attempt::process_block (std::shared_ptr const } else { - node_l->block_processor.add (block_a); + node_l->block_processor.add (block_a, nano::block_source::bootstrap_legacy); } return stop_pull; } diff --git a/nano/node/bootstrap/bootstrap_lazy.cpp b/nano/node/bootstrap/bootstrap_lazy.cpp index 0d3dbc94..d69916a9 100644 --- a/nano/node/bootstrap/bootstrap_lazy.cpp +++ b/nano/node/bootstrap/bootstrap_lazy.cpp @@ -311,7 +311,7 @@ bool nano::bootstrap_attempt_lazy::process_block_lazy (std::shared_ptrblock_processor.add (block_a); + node->block_processor.add (block_a, nano::block_source::bootstrap_legacy); } // Force drop lazy bootstrap connection for long bulk_pull if (pull_blocks_processed > max_blocks) diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index 76e2aac3..14430c09 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -34,11 +34,10 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano: nano::lock_guard lock{ mutex }; auto transaction = ledger.store.tx_begin_read (); - for (auto const & [result, block] : batch) + for (auto const & [result, context] : batch) { - debug_assert (block != nullptr); - - inspect (transaction, result, *block); + debug_assert (context.block != nullptr); + inspect (transaction, result, *context.block); } } @@ -388,7 +387,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc for (auto & block : response.blocks) { - block_processor.add (block); + block_processor.add (block, nano::block_source::bootstrap); } nano::lock_guard lock{ mutex }; throttle.add (true); diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 58af00f1..91034d05 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -1214,7 +1214,6 @@ void nano::json_handler::block_confirm () nano::election_status status{ block_l, 0, 0, std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values::zero (), 0, 1, 0, nano::election_status_type::active_confirmation_height }; node.active.recently_cemented.put (status); // Trigger callback for confirmed block - node.block_arrival.add (hash); auto account (node.ledger.account (transaction, hash)); bool error_or_pruned (false); auto amount (node.ledger.amount_safe (transaction, hash, error_or_pruned)); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 64bf414b..884a0ae8 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -197,7 +197,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons epoch_upgrader{ *this, ledger, store, network_params, logger }, startup_time (std::chrono::steady_clock::now ()), node_seq (seq), - block_broadcast{ network, block_arrival, !flags.disable_block_processor_republishing }, + block_broadcast{ network, !flags.disable_block_processor_republishing }, process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket } { logger.debug (nano::log::type::node, "Constructing node..."); @@ -206,7 +206,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons process_live_dispatcher.connect (block_processor); unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { - this->block_processor.add (info.block); + block_processor.add (info.block, nano::block_source::unchecked); }); vote_cache.rep_weight_query = [this] (nano::account const & rep) { @@ -241,7 +241,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons { observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { auto block_a (status_a.winner); - if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height) && this->block_arrival.recent (block_a->hash ())) + if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) { auto node_l (shared_from_this ()); background ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { @@ -538,7 +538,6 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.vote_processor, "vote_processor")); composite->add_component (collect_container_info (node.rep_crawler, "rep_crawler")); composite->add_component (collect_container_info (node.block_processor, "block_processor")); - composite->add_component (collect_container_info (node.block_arrival, "block_arrival")); composite->add_component (collect_container_info (node.online_reps, "online_reps")); composite->add_component (collect_container_info (node.history, "history")); composite->add_component (node.block_uniquer.collect_container_info ("block_uniquer")); @@ -557,7 +556,6 @@ std::unique_ptr nano::collect_container_info (no void nano::node::process_active (std::shared_ptr const & incoming) { - block_arrival.add (incoming->hash ()); block_processor.add (incoming); } @@ -574,18 +572,12 @@ nano::process_return nano::node::process (nano::block & block) std::optional nano::node::process_local (std::shared_ptr const & block_a) { - // Add block hash as recently arrived to trigger automatic rebroadcast and election - block_arrival.add (block_a->hash ()); - block_broadcast.set_local (block_a); - return block_processor.add_blocking (block_a); + return block_processor.add_blocking (block_a, nano::block_source::local); } void nano::node::process_local_async (std::shared_ptr const & block_a) { - // Add block hash as recently arrived to trigger automatic rebroadcast and election - block_arrival.add (block_a->hash ()); - // Set current time to trigger automatic rebroadcast and election - block_processor.add (block_a); + block_processor.add (block_a, nano::block_source::local); } void nano::node::start () diff --git a/nano/node/node.hpp b/nano/node/node.hpp index d5eae059..19e5a7a2 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -168,7 +167,6 @@ public: nano::vote_processor vote_processor; unsigned warmed_up; nano::block_processor block_processor; - nano::block_arrival block_arrival; nano::local_vote_history history; nano::block_uniquer block_uniquer; nano::vote_uniquer vote_uniquer; diff --git a/nano/node/process_live_dispatcher.cpp b/nano/node/process_live_dispatcher.cpp index c7a6d16d..9dfe769c 100644 --- a/nano/node/process_live_dispatcher.cpp +++ b/nano/node/process_live_dispatcher.cpp @@ -20,10 +20,10 @@ void nano::process_live_dispatcher::connect (nano::block_processor & block_proce { block_processor.batch_processed.add ([this] (auto const & batch) { auto const transaction = ledger.store.tx_begin_read (); - for (auto const & [result, block] : batch) + for (auto const & [result, context] : batch) { - debug_assert (block != nullptr); - inspect (result, *block, transaction); + debug_assert (context.block != nullptr); + inspect (result, *context.block, transaction); } }); } diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 291e0036..3c4d2337 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -4431,7 +4431,6 @@ TEST (rpc, populate_backlog) .work (*node->work_generate_blocking (latest)) .build (); ASSERT_EQ (nano::process_result::progress, node->process (*send).code); - ASSERT_FALSE (node->block_arrival.recent (send->hash ())); auto const rpc_ctx = add_rpc (system, node); boost::property_tree::ptree request; diff --git a/nano/secure/common.cpp b/nano/secure/common.cpp index c9ae8a7d..4d86b31a 100644 --- a/nano/secure/common.cpp +++ b/nano/secure/common.cpp @@ -15,6 +15,7 @@ #include #include +#include size_t constexpr nano::send_block::size; size_t constexpr nano::receive_block::size; @@ -496,6 +497,11 @@ void nano::generate_cache::enable_all () account_count = true; } +std::string_view nano::to_string (nano::process_result process_result) +{ + return magic_enum::enum_name (process_result); +} + nano::stat::detail nano::to_stat_detail (nano::process_result process_result) { auto value = magic_enum::enum_cast (magic_enum::enum_name (process_result)); diff --git a/nano/secure/common.hpp b/nano/secure/common.hpp index 503d4d12..190b8561 100644 --- a/nano/secure/common.hpp +++ b/nano/secure/common.hpp @@ -270,6 +270,10 @@ enum class process_result block_position, // This block cannot follow the previous block insufficient_work // Insufficient work for this block, even though it passed the minimal validation }; + +std::string_view to_string (process_result); +nano::stat::detail to_stat_detail (process_result); + class process_return final { public: @@ -282,8 +286,6 @@ enum class tally_result confirm }; -nano::stat::detail to_stat_detail (process_result); - class network_params; /** Genesis keys and ledger constants for network variants */