diff --git a/nano/node/active_elections.cpp b/nano/node/active_elections.cpp index 669ce6df..bd29634b 100644 --- a/nano/node/active_elections.cpp +++ b/nano/node/active_elections.cpp @@ -65,10 +65,13 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_ }); // Stop all rolled back active transactions except initial - block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) { - if (block->qualified_root () != rollback_root) + block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + for (auto const & block : blocks) { - erase (block->qualified_root ()); + if (block->qualified_root () != rollback_root) + { + erase (block->qualified_root ()); + } } }); } diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index a8336f7c..de314b20 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -27,14 +27,6 @@ nano::block_processor::block_processor (nano::node_config const & node_config, n logger{ logger_a }, workers{ 1, nano::thread_role::name::block_processing_notifications } { - batch_processed.add ([this] (auto const & items) { - // For every batch item: notify the 'processed' observer. - for (auto const & [result, context] : items) - { - block_processed.notify (result, context); - } - }); - queue.max_size_query = [this] (auto const & origin) { switch (origin.source) { @@ -193,7 +185,7 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const // Replace our block with the winner and roll back any dependent blocks logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ()); - std::vector> rollback_list; + std::deque> rollback_list; if (ledger.rollback (transaction, successor->hash (), rollback_list)) { stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed); @@ -205,11 +197,10 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ()); } - // Notify observers of the rolled back blocks - for (auto const & block : rollback_list) - { - rolled_back.notify (block, fork_block.qualified_root ()); - } + // Notify observers of the rolled back blocks on a background thread while not holding the ledger write lock + workers.post ([this, rollback_list = std::move (rollback_list), root = fork_block.qualified_root ()] () { + rolled_back.notify (rollback_list, root); + }); } } diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index a1fcaa09..50ae78c2 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -99,15 +99,14 @@ public: std::atomic flushing{ false }; public: // Events - using processed_t = std::tuple; - using processed_batch_t = std::deque; + // All processed blocks including forks, rejected etc + using processed_batch_t = std::deque>; + using processed_batch_event_t = nano::observer_set; + processed_batch_event_t batch_processed; - // The batch observer feeds the processed observer - nano::observer_set block_processed; - nano::observer_set batch_processed; - - // Rolled back blocks - nano::observer_set const &, nano::qualified_root const &> rolled_back; + // Rolled back blocks + using rolled_back_event_t = nano::observer_set>, nano::qualified_root>; + rolled_back_event_t rolled_back; private: // Dependencies block_processor_config const & config; diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index c2e80c86..b2a41a74 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -56,10 +56,13 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_ } }); - block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) { + block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { nano::lock_guard guard{ mutex }; - auto erased = local_blocks.get ().erase (block->hash ()); - stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased); + for (auto const & block : blocks) + { + auto erased = local_blocks.get ().erase (block->hash ()); + stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased); + } }); confirming_set.cemented_observers.add ([this] (auto const & block) { diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 16d5a6f8..34c72739 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -201,8 +201,11 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy }); // Do some cleanup of rolled back blocks - block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) { - history.erase (block->root ()); + block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) { + for (auto const & block : blocks) + { + history.erase (block->root ()); + } }); if (!init_error ()) diff --git a/nano/secure/ledger.cpp b/nano/secure/ledger.cpp index 23550e74..1700fd17 100644 --- a/nano/secure/ledger.cpp +++ b/nano/secure/ledger.cpp @@ -36,7 +36,7 @@ namespace class rollback_visitor : public nano::block_visitor { public: - rollback_visitor (nano::secure::write_transaction const & transaction_a, nano::ledger & ledger_a, std::vector> & list_a) : + rollback_visitor (nano::secure::write_transaction const & transaction_a, nano::ledger & ledger_a, std::deque> & list_a) : transaction (transaction_a), ledger (ledger_a), list (list_a) @@ -179,7 +179,7 @@ public: } nano::secure::write_transaction const & transaction; nano::ledger & ledger; - std::vector> & list; + std::deque> & list; bool error{ false }; }; @@ -984,7 +984,7 @@ nano::uint128_t nano::ledger::weight_exact (secure::transaction const & txn_a, n } // Rollback blocks until `block_a' doesn't exist or it tries to penetrate the confirmation height -bool nano::ledger::rollback (secure::write_transaction const & transaction_a, nano::block_hash const & block_a, std::vector> & list_a) +bool nano::ledger::rollback (secure::write_transaction const & transaction_a, nano::block_hash const & block_a, std::deque> & list_a) { debug_assert (any.block_exists (transaction_a, block_a)); auto account_l = any.block_account (transaction_a, block_a).value (); @@ -1018,7 +1018,7 @@ bool nano::ledger::rollback (secure::write_transaction const & transaction_a, na bool nano::ledger::rollback (secure::write_transaction const & transaction_a, nano::block_hash const & block_a) { - std::vector> rollback_list; + std::deque> rollback_list; return rollback (transaction_a, block_a, rollback_list); } diff --git a/nano/secure/ledger.hpp b/nano/secure/ledger.hpp index 69c86378..48d71386 100644 --- a/nano/secure/ledger.hpp +++ b/nano/secure/ledger.hpp @@ -62,7 +62,7 @@ public: std::optional pending_info (secure::transaction const &, nano::pending_key const & key) const; std::deque> confirm (secure::write_transaction &, nano::block_hash const & hash, size_t max_blocks = 1024 * 128); nano::block_status process (secure::write_transaction const &, std::shared_ptr block); - bool rollback (secure::write_transaction const &, nano::block_hash const &, std::vector> &); + bool rollback (secure::write_transaction const &, nano::block_hash const &, std::deque> & rollback_list); bool rollback (secure::write_transaction const &, nano::block_hash const &); void update_account (secure::write_transaction const &, nano::account const &, nano::account_info const &, nano::account_info const &); uint64_t pruning_action (secure::write_transaction &, nano::block_hash const &, uint64_t const); diff --git a/nano/test_common/system.cpp b/nano/test_common/system.cpp index 4071ea46..2a61ca26 100644 --- a/nano/test_common/system.cpp +++ b/nano/test_common/system.cpp @@ -448,7 +448,7 @@ void nano::test::system::generate_rollback (nano::node & node_a, std::vector> rollback_list; + std::deque> rollback_list; auto error = node_a.ledger.rollback (transaction, hash, rollback_list); (void)error; debug_assert (!error);