Merge pull request #4791 from pwojcikdev/block-processor-rolled-back-notif

Offload rollback notifications to background thread
This commit is contained in:
Piotr Wójcik 2024-11-26 21:34:59 +01:00 committed by GitHub
commit 1727e0dd16
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 35 additions and 36 deletions

View file

@ -65,10 +65,13 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
});
// Stop all rolled back active transactions except initial
block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
if (block->qualified_root () != rollback_root)
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
for (auto const & block : blocks)
{
erase (block->qualified_root ());
if (block->qualified_root () != rollback_root)
{
erase (block->qualified_root ());
}
}
});
}

View file

@ -27,14 +27,6 @@ nano::block_processor::block_processor (nano::node_config const & node_config, n
logger{ logger_a },
workers{ 1, nano::thread_role::name::block_processing_notifications }
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
for (auto const & [result, context] : items)
{
block_processed.notify (result, context);
}
});
queue.max_size_query = [this] (auto const & origin) {
switch (origin.source)
{
@ -193,7 +185,7 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const
// Replace our block with the winner and roll back any dependent blocks
logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ());
std::vector<std::shared_ptr<nano::block>> rollback_list;
std::deque<std::shared_ptr<nano::block>> rollback_list;
if (ledger.rollback (transaction, successor->hash (), rollback_list))
{
stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed);
@ -205,11 +197,10 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const
logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ());
}
// Notify observers of the rolled back blocks
for (auto const & block : rollback_list)
{
rolled_back.notify (block, fork_block.qualified_root ());
}
// Notify observers of the rolled back blocks on a background thread while not holding the ledger write lock
workers.post ([this, rollback_list = std::move (rollback_list), root = fork_block.qualified_root ()] () {
rolled_back.notify (rollback_list, root);
});
}
}

View file

@ -99,15 +99,14 @@ public:
std::atomic<bool> flushing{ false };
public: // Events
using processed_t = std::tuple<nano::block_status, context>;
using processed_batch_t = std::deque<processed_t>;
// All processed blocks including forks, rejected etc
using processed_batch_t = std::deque<std::pair<nano::block_status, context>>;
using processed_batch_event_t = nano::observer_set<processed_batch_t>;
processed_batch_event_t batch_processed;
// The batch observer feeds the processed observer
nano::observer_set<nano::block_status const &, context const &> block_processed;
nano::observer_set<processed_batch_t const &> batch_processed;
// Rolled back blocks <rolled back block, root of rollback>
nano::observer_set<std::shared_ptr<nano::block> const &, nano::qualified_root const &> rolled_back;
// Rolled back blocks <rolled back blocks, root of rollback>
using rolled_back_event_t = nano::observer_set<std::deque<std::shared_ptr<nano::block>>, nano::qualified_root>;
rolled_back_event_t rolled_back;
private: // Dependencies
block_processor_config const & config;

View file

@ -56,10 +56,13 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_
}
});
block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased);
for (auto const & block : blocks)
{
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased);
}
});
confirming_set.cemented_observers.add ([this] (auto const & block) {

View file

@ -201,8 +201,11 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
});
// Do some cleanup of rolled back blocks
block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
history.erase (block->root ());
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
for (auto const & block : blocks)
{
history.erase (block->root ());
}
});
if (!init_error ())

View file

@ -36,7 +36,7 @@ namespace
class rollback_visitor : public nano::block_visitor
{
public:
rollback_visitor (nano::secure::write_transaction const & transaction_a, nano::ledger & ledger_a, std::vector<std::shared_ptr<nano::block>> & list_a) :
rollback_visitor (nano::secure::write_transaction const & transaction_a, nano::ledger & ledger_a, std::deque<std::shared_ptr<nano::block>> & list_a) :
transaction (transaction_a),
ledger (ledger_a),
list (list_a)
@ -179,7 +179,7 @@ public:
}
nano::secure::write_transaction const & transaction;
nano::ledger & ledger;
std::vector<std::shared_ptr<nano::block>> & list;
std::deque<std::shared_ptr<nano::block>> & list;
bool error{ false };
};
@ -984,7 +984,7 @@ nano::uint128_t nano::ledger::weight_exact (secure::transaction const & txn_a, n
}
// Rollback blocks until `block_a' doesn't exist or it tries to penetrate the confirmation height
bool nano::ledger::rollback (secure::write_transaction const & transaction_a, nano::block_hash const & block_a, std::vector<std::shared_ptr<nano::block>> & list_a)
bool nano::ledger::rollback (secure::write_transaction const & transaction_a, nano::block_hash const & block_a, std::deque<std::shared_ptr<nano::block>> & list_a)
{
debug_assert (any.block_exists (transaction_a, block_a));
auto account_l = any.block_account (transaction_a, block_a).value ();
@ -1018,7 +1018,7 @@ bool nano::ledger::rollback (secure::write_transaction const & transaction_a, na
bool nano::ledger::rollback (secure::write_transaction const & transaction_a, nano::block_hash const & block_a)
{
std::vector<std::shared_ptr<nano::block>> rollback_list;
std::deque<std::shared_ptr<nano::block>> rollback_list;
return rollback (transaction_a, block_a, rollback_list);
}

View file

@ -62,7 +62,7 @@ public:
std::optional<nano::pending_info> pending_info (secure::transaction const &, nano::pending_key const & key) const;
std::deque<std::shared_ptr<nano::block>> confirm (secure::write_transaction &, nano::block_hash const & hash, size_t max_blocks = 1024 * 128);
nano::block_status process (secure::write_transaction const &, std::shared_ptr<nano::block> block);
bool rollback (secure::write_transaction const &, nano::block_hash const &, std::vector<std::shared_ptr<nano::block>> &);
bool rollback (secure::write_transaction const &, nano::block_hash const &, std::deque<std::shared_ptr<nano::block>> & rollback_list);
bool rollback (secure::write_transaction const &, nano::block_hash const &);
void update_account (secure::write_transaction const &, nano::account const &, nano::account_info const &, nano::account_info const &);
uint64_t pruning_action (secure::write_transaction &, nano::block_hash const &, uint64_t const);

View file

@ -448,7 +448,7 @@ void nano::test::system::generate_rollback (nano::node & node_a, std::vector<nan
{
accounts_a[index] = accounts_a[accounts_a.size () - 1];
accounts_a.pop_back ();
std::vector<std::shared_ptr<nano::block>> rollback_list;
std::deque<std::shared_ptr<nano::block>> rollback_list;
auto error = node_a.ledger.rollback (transaction, hash, rollback_list);
(void)error;
debug_assert (!error);