diff --git a/nano/core_test/confirming_set.cpp b/nano/core_test/confirming_set.cpp index 6e8d0787..f412b093 100644 --- a/nano/core_test/confirming_set.cpp +++ b/nano/core_test/confirming_set.cpp @@ -1,9 +1,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -16,45 +18,70 @@ using namespace std::chrono_literals; +namespace +{ +struct confirming_set_context +{ + nano::logger & logger; + nano::stats & stats; + nano::ledger & ledger; + + nano::unchecked_map unchecked; + nano::block_processor block_processor; + nano::confirming_set confirming_set; + + explicit confirming_set_context (nano::test::ledger_context & ledger_context, nano::node_config node_config = {}) : + logger{ ledger_context.logger () }, + stats{ ledger_context.stats () }, + ledger{ ledger_context.ledger () }, + unchecked{ 0, stats, false }, + block_processor{ node_config, ledger, unchecked, stats, logger }, + confirming_set{ node_config.confirming_set, ledger, block_processor, stats, logger } + { + } +}; +} + TEST (confirming_set, construction) { - auto ctx = nano::test::ledger_empty (); - nano::confirming_set_config config{}; - nano::confirming_set confirming_set{ config, ctx.ledger (), ctx.stats (), ctx.logger () }; + auto ledger_ctx = nano::test::ledger_empty (); + confirming_set_context ctx{ ledger_ctx }; } TEST (confirming_set, add_exists) { - auto ctx = nano::test::ledger_send_receive (); - nano::confirming_set_config config{}; - nano::confirming_set confirming_set{ config, ctx.ledger (), ctx.stats (), ctx.logger () }; - auto send = ctx.blocks ()[0]; + auto ledger_ctx = nano::test::ledger_send_receive (); + confirming_set_context ctx{ ledger_ctx }; + nano::confirming_set & confirming_set = ctx.confirming_set; + auto send = ledger_ctx.blocks ()[0]; confirming_set.add (send->hash ()); ASSERT_TRUE (confirming_set.contains (send->hash ())); } TEST (confirming_set, process_one) { - auto ctx = nano::test::ledger_send_receive (); - nano::confirming_set_config config{}; - nano::confirming_set confirming_set{ config, ctx.ledger (), ctx.stats (), ctx.logger () }; + auto ledger_ctx = nano::test::ledger_send_receive (); + confirming_set_context ctx{ ledger_ctx }; + nano::confirming_set & confirming_set = ctx.confirming_set; std::atomic count = 0; std::mutex mutex; std::condition_variable condition; confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); }); - confirming_set.add (ctx.blocks ()[0]->hash ()); + confirming_set.add (ledger_ctx.blocks ()[0]->hash ()); nano::test::start_stop_guard guard{ confirming_set }; std::unique_lock lock{ mutex }; ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 1; })); - ASSERT_EQ (1, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); - ASSERT_EQ (2, ctx.ledger ().cemented_count ()); + ASSERT_EQ (1, ctx.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); + ASSERT_EQ (2, ctx.ledger.cemented_count ()); } TEST (confirming_set, process_multiple) { + nano::test::system system; + auto & node = *system.add_node (); auto ctx = nano::test::ledger_send_receive (); nano::confirming_set_config config{}; - nano::confirming_set confirming_set{ config, ctx.ledger (), ctx.stats (), ctx.logger () }; + nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () }; std::atomic count = 0; std::mutex mutex; std::condition_variable condition; diff --git a/nano/core_test/election_scheduler.cpp b/nano/core_test/election_scheduler.cpp index 14f840d4..422df997 100644 --- a/nano/core_test/election_scheduler.cpp +++ b/nano/core_test/election_scheduler.cpp @@ -195,7 +195,7 @@ TEST (election_scheduler, no_vacancy) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (); ASSERT_EQ (nano::block_status::progress, node.process (send)); - node.process_confirmed (send->hash ()); + node.confirming_set.add (send->hash ()); auto receive = builder.make_block () .account (key.pub) @@ -207,7 +207,7 @@ TEST (election_scheduler, no_vacancy) .work (*system.work.generate (key.pub)) .build (); ASSERT_EQ (nano::block_status::progress, node.process (receive)); - node.process_confirmed (receive->hash ()); + node.confirming_set.add (receive->hash ()); ASSERT_TIMELY (5s, nano::test::confirmed (node, { send, receive })); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 7a50183c..2db65d6a 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3446,9 +3446,9 @@ TEST (node, pruning_automatic) ASSERT_TIMELY (5s, node1.block (send2->hash ()) != nullptr); // Force-confirm both blocks - node1.process_confirmed (send1->hash ()); + node1.confirming_set.add (send1->hash ()); ASSERT_TIMELY (5s, node1.block_confirmed (send1->hash ())); - node1.process_confirmed (send2->hash ()); + node1.confirming_set.add (send2->hash ()); ASSERT_TIMELY (5s, node1.block_confirmed (send2->hash ())); // Check pruning result @@ -3497,9 +3497,9 @@ TEST (node, DISABLED_pruning_age) node1.process_active (send2); // Force-confirm both blocks - node1.process_confirmed (send1->hash ()); + node1.confirming_set.add (send1->hash ()); ASSERT_TIMELY (5s, node1.block_confirmed (send1->hash ())); - node1.process_confirmed (send2->hash ()); + node1.confirming_set.add (send2->hash ()); ASSERT_TIMELY (5s, node1.block_confirmed (send2->hash ())); // Three blocks in total, nothing pruned yet @@ -3558,9 +3558,9 @@ TEST (node, DISABLED_pruning_depth) node1.process_active (send2); // Force-confirm both blocks - node1.process_confirmed (send1->hash ()); + node1.confirming_set.add (send1->hash ()); ASSERT_TIMELY (5s, node1.block_confirmed (send1->hash ())); - node1.process_confirmed (send2->hash ()); + node1.confirming_set.add (send2->hash ()); ASSERT_TIMELY (5s, node1.block_confirmed (send2->hash ())); // Three blocks in total, nothing pruned yet diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index ab514680..b9ce858a 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -147,6 +147,8 @@ enum class detail prioritized, pending, sync, + requeued, + evicted, // processing queue queue, @@ -541,6 +543,7 @@ enum class detail cementing, cemented_hash, cementing_failed, + deferred_failed, // election_state passive, diff --git a/nano/node/active_elections.cpp b/nano/node/active_elections.cpp index 758752a0..669ce6df 100644 --- a/nano/node/active_elections.cpp +++ b/nano/node/active_elections.cpp @@ -63,6 +63,14 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_ } } }); + + // Stop all rolled back active transactions except initial + block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) { + if (block->qualified_root () != rollback_root) + { + erase (block->qualified_root ()); + } + }); } nano::active_elections::~active_elections () diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 7f47f79d..a8336f7c 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -17,9 +18,13 @@ * block_processor */ -nano::block_processor::block_processor (nano::node & node_a) : - config{ node_a.config.block_processor }, - node{ node_a }, +nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) : + config{ node_config.block_processor }, + network_params{ node_config.network_params }, + ledger{ ledger_a }, + unchecked{ unchecked_a }, + stats{ stats_a }, + logger{ logger_a }, workers{ 1, nano::thread_role::name::block_processing_notifications } { batch_processed.add ([this] (auto const & items) { @@ -57,6 +62,11 @@ nano::block_processor::block_processor (nano::node & node_a) : return 1; } }; + + // Requeue blocks that could not be immediately processed + unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { + add (info.block, nano::block_source::unchecked); + }); } nano::block_processor::~block_processor () @@ -107,14 +117,14 @@ std::size_t nano::block_processor::size (nano::block_source source) const bool nano::block_processor::add (std::shared_ptr const & block, block_source const source, std::shared_ptr const & channel, std::function callback) { - if (node.network_params.work.validate_entry (*block)) // true => error + if (network_params.work.validate_entry (*block)) // true => error { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); return false; // Not added } - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); - node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})", + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); + logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})", block->hash ().to_string (), to_string (source), channel ? channel->to_string () : ""); // TODO: Lazy eval @@ -124,8 +134,8 @@ bool nano::block_processor::add (std::shared_ptr const & block, blo std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking); - node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source)); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking); + logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source)); context ctx{ block, source }; auto future = ctx.get_future (); @@ -138,8 +148,8 @@ std::optional nano::block_processor::add_blocking (std::shar } catch (std::future_error const &) { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout); - node.logger.error (nano::log::type::blockprocessor, "Block dropped when processing: {}", block->hash ().to_string ()); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout); + logger.error (nano::log::type::blockprocessor, "Block dropped when processing: {}", block->hash ().to_string ()); } return std::nullopt; @@ -147,8 +157,8 @@ std::optional nano::block_processor::add_blocking (std::shar void nano::block_processor::force (std::shared_ptr const & block_a) { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); - node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); + logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); add_impl (context{ block_a, block_source::forced }); } @@ -167,45 +177,38 @@ bool nano::block_processor::add_impl (context ctx, std::shared_ptrhash () != hash) { // Replace our block with the winner and roll back any dependent blocks - node.logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ()); + logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ()); std::vector> rollback_list; - if (node.ledger.rollback (transaction, successor->hash (), rollback_list)) + if (ledger.rollback (transaction, successor->hash (), rollback_list)) { - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed); - node.logger.error (nano::log::type::blockprocessor, "Failed to roll back: {} because it or a successor was confirmed", successor->hash ().to_string ()); + stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed); + logger.error (nano::log::type::blockprocessor, "Failed to roll back: {} because it or a successor was confirmed", successor->hash ().to_string ()); } else { - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback); - node.logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ()); + stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback); + logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ()); } - // Deleting from votes cache, stop active transaction - for (auto & i : rollback_list) + // Notify observers of the rolled back blocks + for (auto const & block : rollback_list) { - rolled_back.notify (i); - - node.history.erase (i->root ()); - // Stop all rolled back active transactions except initial - if (i->hash () != successor->hash ()) - { - node.active.erase (*i); - } + rolled_back.notify (block, fork_block.qualified_root ()); } } } @@ -221,7 +224,7 @@ void nano::block_processor::run () // It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here while (workers.queued_tasks () >= config.max_queued_notifications) { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown); condition.wait_for (lock, 100ms, [this] { return stopped; }); if (stopped) { @@ -231,7 +234,7 @@ void nano::block_processor::run () if (log_interval.elapsed (15s)) { - node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", + logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", queue.size (), queue.size ({ nano::block_source::forced })); } @@ -242,7 +245,7 @@ void nano::block_processor::run () // Queue notifications to be dispatched in the background workers.post ([this, processed = std::move (processed)] () mutable { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify); + stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify); // Set results for futures when not holding the lock for (auto & [result, context] : processed) { @@ -304,7 +307,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock lock.unlock (); - auto transaction = node.ledger.tx_begin_write (nano::store::writer::blockprocessor); + auto transaction = ledger.tx_begin_write (nano::store::writer::blockprocessor); nano::timer timer; timer.start (); @@ -335,7 +338,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock if (number_of_blocks_processed != 0 && timer.stop () > std::chrono::milliseconds (100)) { - node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ()); + logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ()); } return processed; @@ -345,12 +348,12 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction { auto block = context.block; auto const hash = block->hash (); - nano::block_status result = node.ledger.process (transaction_a, block); + nano::block_status result = ledger.process (transaction_a, block); - node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result)); - node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source)); + stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result)); + stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source)); - node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, + logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, nano::log::arg{ "result", result }, nano::log::arg{ "source", context.source }, nano::log::arg{ "arrival", nano::log::microseconds (context.arrival) }, @@ -361,40 +364,41 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction { case nano::block_status::progress: { - queue_unchecked (transaction_a, hash); - /* For send blocks check epoch open unchecked (gap pending). - For state blocks check only send subtype and only if block epoch is not last epoch. - If epoch is last, then pending entry shouldn't trigger same epoch open block for destination account. */ + unchecked.trigger (hash); + + /* + * For send blocks check epoch open unchecked (gap pending). + * For state blocks check only send subtype and only if block epoch is not last epoch. + * If epoch is last, then pending entry shouldn't trigger same epoch open block for destination account. + */ if (block->type () == nano::block_type::send || (block->type () == nano::block_type::state && block->is_send () && std::underlying_type_t (block->sideband ().details.epoch) < std::underlying_type_t (nano::epoch::max))) { - /* block->destination () for legacy send blocks - block->link () for state blocks (send subtype) */ - queue_unchecked (transaction_a, block->destination ()); + unchecked.trigger (block->destination ()); } break; } case nano::block_status::gap_previous: { - node.unchecked.put (block->previous (), block); - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous); + unchecked.put (block->previous (), block); + stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous); break; } case nano::block_status::gap_source: { release_assert (block->source_field () || block->link_field ()); - node.unchecked.put (block->source_field ().value_or (block->link_field ().value_or (0).as_block_hash ()), block); - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source); + unchecked.put (block->source_field ().value_or (block->link_field ().value_or (0).as_block_hash ()), block); + stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source); break; } case nano::block_status::gap_epoch_open_pending: { - node.unchecked.put (block->account_field ().value_or (0), block); // Specific unchecked key starting with epoch open block account public key - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source); + unchecked.put (block->account_field ().value_or (0), block); // Specific unchecked key starting with epoch open block account public key + stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source); break; } case nano::block_status::old: { - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::old); + stats.inc (nano::stat::type::ledger, nano::stat::detail::old); break; } case nano::block_status::bad_signature: @@ -411,7 +415,7 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction } case nano::block_status::fork: { - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::fork); + stats.inc (nano::stat::type::ledger, nano::stat::detail::fork); break; } case nano::block_status::opened_burn_account: @@ -438,11 +442,6 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction return result; } -void nano::block_processor::queue_unchecked (secure::write_transaction const & transaction_a, nano::hash_or_account const & hash_or_account_a) -{ - node.unchecked.trigger (hash_or_account_a); -} - nano::container_info nano::block_processor::container_info () const { nano::lock_guard guard{ mutex }; diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index cb54711b..a1fcaa09 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -82,7 +82,7 @@ public: // Context }; public: - explicit block_processor (nano::node &); + block_processor (nano::node_config const &, nano::ledger &, nano::unchecked_map &, nano::stats &, nano::logger &); ~block_processor (); void start (); @@ -105,23 +105,28 @@ public: // Events // The batch observer feeds the processed observer nano::observer_set block_processed; nano::observer_set batch_processed; - nano::observer_set const &> rolled_back; + + // Rolled back blocks + nano::observer_set const &, nano::qualified_root const &> rolled_back; + +private: // Dependencies + block_processor_config const & config; + nano::network_params const & network_params; + nano::ledger & ledger; + nano::unchecked_map & unchecked; + nano::stats & stats; + nano::logger & logger; private: void run (); // Roll back block in the ledger that conflicts with 'block' void rollback_competitor (secure::write_transaction const &, nano::block const & block); nano::block_status process_one (secure::write_transaction const &, context const &, bool forced = false); - void queue_unchecked (secure::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); std::deque next_batch (size_t max_count); context next (); bool add_impl (context, std::shared_ptr const & channel = nullptr); -private: // Dependencies - block_processor_config const & config; - nano::node & node; - private: nano::fair_queue queue; diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index 3de7e7dc..c9e79444 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -1,6 +1,8 @@ +#include #include #include #include +#include #include #include #include @@ -8,9 +10,10 @@ #include #include -nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::stats & stats_a, nano::logger & logger_a) : +nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::block_processor & block_processor_a, nano::stats & stats_a, nano::logger & logger_a) : config{ config_a }, ledger{ ledger_a }, + block_processor{ block_processor_a }, stats{ stats_a }, logger{ logger_a }, workers{ 1, nano::thread_role::name::confirmation_height_notifications } @@ -21,6 +24,28 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na cemented_observers.notify (context.block); } }); + + // Requeue blocks that failed to cement immediately due to missing ledger blocks + block_processor.batch_processed.add ([this] (auto const & batch) { + bool should_notify = false; + { + std::lock_guard lock{ mutex }; + for (auto const & [result, context] : batch) + { + if (auto it = deferred.get ().find (context.block->hash ()); it != deferred.get ().end ()) + { + stats.inc (nano::stat::type::confirming_set, nano::stat::detail::requeued); + set.push_back (*it); + deferred.get ().erase (it); + should_notify = true; + } + } + } + if (should_notify) + { + condition.notify_all (); + } + }); } nano::confirming_set::~confirming_set () @@ -81,13 +106,14 @@ void nano::confirming_set::stop () bool nano::confirming_set::contains (nano::block_hash const & hash) const { std::lock_guard lock{ mutex }; - return set.get ().contains (hash) || current.contains (hash); + return set.get ().contains (hash) || deferred.get ().contains (hash) || current.contains (hash); } std::size_t nano::confirming_set::size () const { + // Do not report deferred blocks, as they are not currently being processed (and might never be requeued) std::lock_guard lock{ mutex }; - return set.size (); + return set.size () + current.size (); } void nano::confirming_set::run () @@ -97,6 +123,9 @@ void nano::confirming_set::run () { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::loop); + cleanup (lock); + debug_assert (lock.owns_lock ()); + if (!set.empty ()) { run_batch (lock); @@ -137,9 +166,9 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) // Keep track of the blocks we're currently cementing, so that the .contains (...) check is accurate debug_assert (current.empty ()); - for (auto const & [hash, election] : batch) + for (auto const & entry : batch) { - current.insert (hash); + current.insert (entry.hash); } lock.unlock (); @@ -180,8 +209,11 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) { auto transaction = ledger.tx_begin_write (nano::store::writer::confirmation_height); - for (auto const & [hash, election] : batch) + for (auto const & entry : batch) { + auto const & hash = entry.hash; + auto const & election = entry.election; + size_t cemented_count = 0; bool success = false; do @@ -236,6 +268,12 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cementing_failed); logger.debug (nano::log::type::confirming_set, "Failed to cement block: {}", hash.to_string ()); + + // Requeue failed blocks for processing later + // Add them to the deferred set while still holding the exclusive database write transaction to avoid block processor races + lock.lock (); + deferred.push_back (entry); + lock.unlock (); } } } @@ -245,18 +283,59 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) already_cemented.notify (already); + // Clear current set only after the transaction is committed lock.lock (); current.clear (); lock.unlock (); } +void nano::confirming_set::cleanup (std::unique_lock & lock) +{ + debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); + + auto const cutoff = std::chrono::steady_clock::now () - config.deferred_age_cutoff; + std::deque evicted; + + auto should_evict = [&] (entry const & entry) { + return entry.timestamp < cutoff; + }; + + // Iterate in sequenced (insertion) order + for (auto it = deferred.begin (), end = deferred.end (); it != end;) + { + if (should_evict (*it) || deferred.size () > config.max_deferred) + { + stats.inc (nano::stat::type::confirming_set, nano::stat::detail::evicted); + debug_assert (ledger.any.block_exists (ledger.tx_begin_read (), it->hash)); + evicted.push_back (*it); + it = deferred.erase (it); + } + else + { + break; // Entries are sequenced, so we can stop here and avoid unnecessary iteration + } + } + + // Notify about evicted blocks so that other components can perform necessary cleanup + if (!evicted.empty ()) + { + lock.unlock (); + for (auto const & entry : evicted) + { + cementing_failed.notify (entry.hash); + } + lock.lock (); + } +} + nano::container_info nano::confirming_set::container_info () const { std::lock_guard guard{ mutex }; nano::container_info info; info.put ("set", set); - info.put ("notifications", workers.queued_tasks ()); + info.put ("deferred", deferred); info.add ("workers", workers.container_info ()); return info; } diff --git a/nano/node/confirming_set.hpp b/nano/node/confirming_set.hpp index 50ca6565..908b08c9 100644 --- a/nano/node/confirming_set.hpp +++ b/nano/node/confirming_set.hpp @@ -36,6 +36,11 @@ public: /** Maximum number of dependent blocks to be stored in memory during processing */ size_t max_blocks{ 128 * 1024 }; size_t max_queued_notifications{ 8 }; + + /** Maximum number of failed blocks to wait for requeuing */ + size_t max_deferred{ 16 * 1024 }; + /** Max age of deferred blocks before they are dropped */ + std::chrono::seconds deferred_age_cutoff{ 15min }; }; /** @@ -47,7 +52,7 @@ class confirming_set final friend class confirmation_height_pruned_source_Test; public: - confirming_set (confirming_set_config const &, nano::ledger &, nano::stats &, nano::logger &); + confirming_set (confirming_set_config const &, nano::ledger &, nano::block_processor &, nano::stats &, nano::logger &); ~confirming_set (); void start (); @@ -71,12 +76,14 @@ public: // Events nano::observer_set const &> batch_cemented; nano::observer_set const &> already_cemented; + nano::observer_set cementing_failed; nano::observer_set> cemented_observers; private: // Dependencies confirming_set_config const & config; nano::ledger & ledger; + nano::block_processor & block_processor; nano::stats & stats; nano::logger & logger; @@ -85,11 +92,13 @@ private: { nano::block_hash hash; std::shared_ptr election; + std::chrono::steady_clock::time_point timestamp{ std::chrono::steady_clock::now () }; }; void run (); void run_batch (std::unique_lock &); std::deque next_batch (size_t max_count); + void cleanup (std::unique_lock &); private: // clang-format off @@ -104,7 +113,11 @@ private: >>; // clang-format on + // Blocks that are ready to be cemented ordered_entries set; + // Blocks that could not be cemented immediately (e.g. waiting for rollbacks to complete) + ordered_entries deferred; + // Blocks that are being cemented in the current batch std::unordered_set current; std::atomic stopped{ false }; diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 7cd964aa..1e760bb9 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -55,19 +55,28 @@ void nano::election::confirm_once (nano::unique_lock & lock) node.active.recently_confirmed.put (qualified_root, status_l.winner->hash ()); + auto const extended_status = current_status_locked (); + node.stats.inc (nano::stat::type::election, nano::stat::detail::confirm_once); node.logger.trace (nano::log::type::election, nano::log::detail::election_confirmed, nano::log::arg{ "id", id }, nano::log::arg{ "qualified_root", qualified_root }, - nano::log::arg{ "status", current_status_locked () }); + nano::log::arg{ "status", extended_status }); + + node.logger.debug (nano::log::type::election, "Election confirmed with winner: {} (behavior: {}, state: {}, voters: {}, blocks: {}, duration: {}ms, confirmation requests: {})", + status_l.winner->hash ().to_string (), + to_string (behavior_m), + to_string (state_m), + extended_status.status.voter_count, + extended_status.status.block_count, + extended_status.status.election_duration.count (), + extended_status.status.confirmation_request_count); + + node.confirming_set.add (status_l.winner->hash (), shared_from_this ()); lock.unlock (); - node.election_workers.post ([this_l = shared_from_this (), status_l, confirmation_action_l = confirmation_action] () { - // This is necessary if the winner of the election is one of the forks. - // In that case the winning block is not yet in the ledger and cementing needs to wait for rollbacks to complete. - this_l->node.process_confirmed (status_l.winner->hash (), this_l); - + node.election_workers.post ([status_l, confirmation_action_l = confirmation_action] () { if (confirmation_action_l) { confirmation_action_l (status_l.winner); diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 0e5c2f6c..b7b3740e 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -29,6 +29,8 @@ class recently_cemented_cache; class recently_confirmed_cache; class rep_crawler; class rep_tiers; +class telemetry; +class unchecked_map; class stats; class vote_cache; enum class vote_code; diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index f136bd03..c2e80c86 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -56,7 +56,7 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_ } }); - block_processor.rolled_back.add ([this] (auto const & block) { + block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) { nano::lock_guard guard{ mutex }; auto erased = local_blocks.get ().erase (block->hash ()); stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 8ec67d59..16d5a6f8 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -125,8 +125,9 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy application_path (application_path_a), port_mapping_impl{ std::make_unique (*this) }, port_mapping{ *port_mapping_impl }, - block_processor (*this), - confirming_set_impl{ std::make_unique (config.confirming_set, ledger, stats, logger) }, + block_processor_impl{ std::make_unique (config, ledger, unchecked, stats, logger) }, + block_processor{ *block_processor_impl }, + confirming_set_impl{ std::make_unique (config.confirming_set, ledger, block_processor, stats, logger) }, confirming_set{ *confirming_set_impl }, active_impl{ std::make_unique (*this, confirming_set, block_processor) }, active{ *active_impl }, @@ -175,10 +176,6 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy process_live_dispatcher.connect (block_processor); - unchecked.satisfied.add ([this] (nano::unchecked_info const & info) { - block_processor.add (info.block, nano::block_source::unchecked); - }); - vote_cache.rep_weight_query = [this] (nano::account const & rep) { return ledger.weight (rep); }; @@ -198,6 +195,16 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy } }); + // Do some cleanup due to this block never being processed by confirmation height processor + confirming_set.cementing_failed.add ([this] (auto const & hash) { + active.recently_confirmed.erase (hash); + }); + + // Do some cleanup of rolled back blocks + block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) { + history.erase (block->root ()); + }); + if (!init_error ()) { wallets.observer = [this] (bool active) { @@ -1057,39 +1064,6 @@ void nano::node::ongoing_online_weight_calculation () ongoing_online_weight_calculation_queue (); } -// TODO: Replace this with a queue of some sort. Blocks submitted here could be in a limbo for a while: neither part of an active election nor cemented -void nano::node::process_confirmed (nano::block_hash hash, std::shared_ptr election, uint64_t iteration) -{ - stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::initiate); - - // Limit the maximum number of iterations to avoid getting stuck - uint64_t const max_iterations = (config.block_processor_batch_max_time / network_params.node.process_confirmed_interval) * 4; - - if (auto block = ledger.any.block_get (ledger.tx_begin_read (), hash)) - { - stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::done); - logger.trace (nano::log::type::node, nano::log::detail::process_confirmed, nano::log::arg{ "block", block }); - - confirming_set.add (block->hash (), election); - } - else if (iteration < max_iterations) - { - stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::retry); - - // Try again later - election_workers.post_delayed (network_params.node.process_confirmed_interval, [this, hash, election, iteration] () { - process_confirmed (hash, election, iteration + 1); - }); - } - else - { - stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::timeout); - - // Do some cleanup due to this block never being processed by confirmation height processor - active.recently_confirmed.erase (hash); - } -} - std::shared_ptr nano::node::shared () { return shared_from_this (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index e6e284e8..2d4f4a35 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -87,7 +86,6 @@ public: void keepalive (std::string const &, uint16_t); int store_version (); void inbound (nano::message const &, std::shared_ptr const &); - void process_confirmed (nano::block_hash, std::shared_ptr = nullptr, uint64_t iteration = 0); void process_active (std::shared_ptr const &); std::optional process_local (std::shared_ptr const &); void process_local_async (std::shared_ptr const &); @@ -177,7 +175,8 @@ public: nano::node_observers observers; std::unique_ptr port_mapping_impl; nano::port_mapping & port_mapping; - nano::block_processor block_processor; + std::unique_ptr block_processor_impl; + nano::block_processor & block_processor; std::unique_ptr confirming_set_impl; nano::confirming_set & confirming_set; std::unique_ptr active_impl; @@ -229,20 +228,14 @@ public: std::atomic stopped{ false }; static double constexpr price_max = 16.0; static double constexpr free_cutoff = 1024.0; - // For tests only + +public: // For tests only unsigned node_seq; - // For tests only std::optional work_generate_blocking (nano::block &); - // For tests only std::optional work_generate_blocking (nano::root const &, uint64_t); - // For tests only std::optional work_generate_blocking (nano::root const &); public: // Testing convenience functions - /** - Creates a new write transaction and inserts `block' and returns result - Transaction is comitted before function return - */ [[nodiscard]] nano::block_status process (std::shared_ptr block); [[nodiscard]] nano::block_status process (secure::write_transaction const &, std::shared_ptr block); nano::block_hash latest (nano::account const &); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index f583c842..2736d7dd 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1121,6 +1121,7 @@ TEST (confirmation_height, many_accounts_send_receive_self) // as opposed to active transactions which implicitly calls confirmation height processor. TEST (confirmation_height, many_accounts_send_receive_self_no_elections) { + nano::test::system system; if (nano::rocksdb_config::using_rocksdb_in_tests ()) { // Don't test this in rocksdb mode @@ -1139,8 +1140,12 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections) nano::block_hash block_hash_being_processed{ 0 }; nano::store::write_queue write_queue; + + nano::node_config node_config; + nano::unchecked_map unchecked{ 0, stats, false }; + nano::block_processor block_processor{ node_config, ledger, unchecked, stats, logger }; nano::confirming_set_config confirming_set_config{}; - nano::confirming_set confirming_set{ confirming_set_config, ledger, stats, logger }; + nano::confirming_set confirming_set{ confirming_set_config, ledger, block_processor, stats, logger }; auto const num_accounts = 100000; @@ -1149,7 +1154,6 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections) std::vector> open_blocks; nano::block_builder builder; - nano::test::system system; { auto transaction = ledger.tx_begin_write ();