Merge pull request #4758 from pwojcikdev/remove-process-confirmed

Remove `node::process_confirmed (...)`
This commit is contained in:
Piotr Wójcik 2024-11-24 03:25:51 +01:00 committed by GitHub
commit b00064b5a7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 276 additions and 160 deletions

View file

@ -1,9 +1,11 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/node/active_elections.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/make_store.hpp>
#include <nano/node/unchecked_map.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_confirmed.hpp>
#include <nano/test_common/ledger_context.hpp>
@ -16,45 +18,70 @@
using namespace std::chrono_literals;
namespace
{
struct confirming_set_context
{
nano::logger & logger;
nano::stats & stats;
nano::ledger & ledger;
nano::unchecked_map unchecked;
nano::block_processor block_processor;
nano::confirming_set confirming_set;
explicit confirming_set_context (nano::test::ledger_context & ledger_context, nano::node_config node_config = {}) :
logger{ ledger_context.logger () },
stats{ ledger_context.stats () },
ledger{ ledger_context.ledger () },
unchecked{ 0, stats, false },
block_processor{ node_config, ledger, unchecked, stats, logger },
confirming_set{ node_config.confirming_set, ledger, block_processor, stats, logger }
{
}
};
}
TEST (confirming_set, construction)
{
auto ctx = nano::test::ledger_empty ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), ctx.stats (), ctx.logger () };
auto ledger_ctx = nano::test::ledger_empty ();
confirming_set_context ctx{ ledger_ctx };
}
TEST (confirming_set, add_exists)
{
auto ctx = nano::test::ledger_send_receive ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), ctx.stats (), ctx.logger () };
auto send = ctx.blocks ()[0];
auto ledger_ctx = nano::test::ledger_send_receive ();
confirming_set_context ctx{ ledger_ctx };
nano::confirming_set & confirming_set = ctx.confirming_set;
auto send = ledger_ctx.blocks ()[0];
confirming_set.add (send->hash ());
ASSERT_TRUE (confirming_set.contains (send->hash ()));
}
TEST (confirming_set, process_one)
{
auto ctx = nano::test::ledger_send_receive ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), ctx.stats (), ctx.logger () };
auto ledger_ctx = nano::test::ledger_send_receive ();
confirming_set_context ctx{ ledger_ctx };
nano::confirming_set & confirming_set = ctx.confirming_set;
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); });
confirming_set.add (ctx.blocks ()[0]->hash ());
confirming_set.add (ledger_ctx.blocks ()[0]->hash ());
nano::test::start_stop_guard guard{ confirming_set };
std::unique_lock lock{ mutex };
ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 1; }));
ASSERT_EQ (1, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (2, ctx.ledger ().cemented_count ());
ASSERT_EQ (1, ctx.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (2, ctx.ledger.cemented_count ());
}
TEST (confirming_set, process_multiple)
{
nano::test::system system;
auto & node = *system.add_node ();
auto ctx = nano::test::ledger_send_receive ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), ctx.stats (), ctx.logger () };
nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () };
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;

View file

@ -195,7 +195,7 @@ TEST (election_scheduler, no_vacancy)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
ASSERT_EQ (nano::block_status::progress, node.process (send));
node.process_confirmed (send->hash ());
node.confirming_set.add (send->hash ());
auto receive = builder.make_block ()
.account (key.pub)
@ -207,7 +207,7 @@ TEST (election_scheduler, no_vacancy)
.work (*system.work.generate (key.pub))
.build ();
ASSERT_EQ (nano::block_status::progress, node.process (receive));
node.process_confirmed (receive->hash ());
node.confirming_set.add (receive->hash ());
ASSERT_TIMELY (5s, nano::test::confirmed (node, { send, receive }));

View file

@ -3446,9 +3446,9 @@ TEST (node, pruning_automatic)
ASSERT_TIMELY (5s, node1.block (send2->hash ()) != nullptr);
// Force-confirm both blocks
node1.process_confirmed (send1->hash ());
node1.confirming_set.add (send1->hash ());
ASSERT_TIMELY (5s, node1.block_confirmed (send1->hash ()));
node1.process_confirmed (send2->hash ());
node1.confirming_set.add (send2->hash ());
ASSERT_TIMELY (5s, node1.block_confirmed (send2->hash ()));
// Check pruning result
@ -3497,9 +3497,9 @@ TEST (node, DISABLED_pruning_age)
node1.process_active (send2);
// Force-confirm both blocks
node1.process_confirmed (send1->hash ());
node1.confirming_set.add (send1->hash ());
ASSERT_TIMELY (5s, node1.block_confirmed (send1->hash ()));
node1.process_confirmed (send2->hash ());
node1.confirming_set.add (send2->hash ());
ASSERT_TIMELY (5s, node1.block_confirmed (send2->hash ()));
// Three blocks in total, nothing pruned yet
@ -3558,9 +3558,9 @@ TEST (node, DISABLED_pruning_depth)
node1.process_active (send2);
// Force-confirm both blocks
node1.process_confirmed (send1->hash ());
node1.confirming_set.add (send1->hash ());
ASSERT_TIMELY (5s, node1.block_confirmed (send1->hash ()));
node1.process_confirmed (send2->hash ());
node1.confirming_set.add (send2->hash ());
ASSERT_TIMELY (5s, node1.block_confirmed (send2->hash ()));
// Three blocks in total, nothing pruned yet

View file

@ -147,6 +147,8 @@ enum class detail
prioritized,
pending,
sync,
requeued,
evicted,
// processing queue
queue,
@ -541,6 +543,7 @@ enum class detail
cementing,
cemented_hash,
cementing_failed,
deferred_failed,
// election_state
passive,

View file

@ -63,6 +63,14 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
}
}
});
// Stop all rolled back active transactions except initial
block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
if (block->qualified_root () != rollback_root)
{
erase (block->qualified_root ());
}
});
}
nano::active_elections::~active_elections ()

View file

@ -7,6 +7,7 @@
#include <nano/node/blockprocessor.hpp>
#include <nano/node/local_vote_history.hpp>
#include <nano/node/node.hpp>
#include <nano/node/unchecked_map.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
#include <nano/store/component.hpp>
@ -17,9 +18,13 @@
* block_processor
*/
nano::block_processor::block_processor (nano::node & node_a) :
config{ node_a.config.block_processor },
node{ node_a },
nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ node_config.block_processor },
network_params{ node_config.network_params },
ledger{ ledger_a },
unchecked{ unchecked_a },
stats{ stats_a },
logger{ logger_a },
workers{ 1, nano::thread_role::name::block_processing_notifications }
{
batch_processed.add ([this] (auto const & items) {
@ -57,6 +62,11 @@ nano::block_processor::block_processor (nano::node & node_a) :
return 1;
}
};
// Requeue blocks that could not be immediately processed
unchecked.satisfied.add ([this] (nano::unchecked_info const & info) {
add (info.block, nano::block_source::unchecked);
});
}
nano::block_processor::~block_processor ()
@ -107,14 +117,14 @@ std::size_t nano::block_processor::size (nano::block_source source) const
bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source, std::shared_ptr<nano::transport::channel> const & channel, std::function<void (nano::block_status)> callback)
{
if (node.network_params.work.validate_entry (*block)) // true => error
if (network_params.work.validate_entry (*block)) // true => error
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work);
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work);
return false; // Not added
}
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})",
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})",
block->hash ().to_string (),
to_string (source),
channel ? channel->to_string () : "<unknown>"); // TODO: Lazy eval
@ -124,8 +134,8 @@ bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, blo
std::optional<nano::block_status> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source));
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking);
logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source));
context ctx{ block, source };
auto future = ctx.get_future ();
@ -138,8 +148,8 @@ std::optional<nano::block_status> nano::block_processor::add_blocking (std::shar
}
catch (std::future_error const &)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout);
node.logger.error (nano::log::type::blockprocessor, "Block dropped when processing: {}", block->hash ().to_string ());
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout);
logger.error (nano::log::type::blockprocessor, "Block dropped when processing: {}", block->hash ().to_string ());
}
return std::nullopt;
@ -147,8 +157,8 @@ std::optional<nano::block_status> nano::block_processor::add_blocking (std::shar
void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force);
node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ());
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force);
logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ());
add_impl (context{ block_a, block_source::forced });
}
@ -167,45 +177,38 @@ bool nano::block_processor::add_impl (context ctx, std::shared_ptr<nano::transpo
}
else
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
node.stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (source));
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (source));
}
return added;
}
void nano::block_processor::rollback_competitor (secure::write_transaction const & transaction, nano::block const & block)
void nano::block_processor::rollback_competitor (secure::write_transaction const & transaction, nano::block const & fork_block)
{
auto hash = block.hash ();
auto successor_hash = node.ledger.any.block_successor (transaction, block.qualified_root ());
auto successor = successor_hash ? node.ledger.any.block_get (transaction, successor_hash.value ()) : nullptr;
auto const hash = fork_block.hash ();
auto const successor_hash = ledger.any.block_successor (transaction, fork_block.qualified_root ());
auto const successor = successor_hash ? ledger.any.block_get (transaction, successor_hash.value ()) : nullptr;
if (successor != nullptr && successor->hash () != hash)
{
// Replace our block with the winner and roll back any dependent blocks
node.logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ());
logger.debug (nano::log::type::blockprocessor, "Rolling back: {} and replacing with: {}", successor->hash ().to_string (), hash.to_string ());
std::vector<std::shared_ptr<nano::block>> rollback_list;
if (node.ledger.rollback (transaction, successor->hash (), rollback_list))
if (ledger.rollback (transaction, successor->hash (), rollback_list))
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed);
node.logger.error (nano::log::type::blockprocessor, "Failed to roll back: {} because it or a successor was confirmed", successor->hash ().to_string ());
stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback_failed);
logger.error (nano::log::type::blockprocessor, "Failed to roll back: {} because it or a successor was confirmed", successor->hash ().to_string ());
}
else
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback);
node.logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ());
stats.inc (nano::stat::type::ledger, nano::stat::detail::rollback);
logger.debug (nano::log::type::blockprocessor, "Blocks rolled back: {}", rollback_list.size ());
}
// Deleting from votes cache, stop active transaction
for (auto & i : rollback_list)
// Notify observers of the rolled back blocks
for (auto const & block : rollback_list)
{
rolled_back.notify (i);
node.history.erase (i->root ());
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
{
node.active.erase (*i);
}
rolled_back.notify (block, fork_block.qualified_root ());
}
}
}
@ -221,7 +224,7 @@ void nano::block_processor::run ()
// It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here
while (workers.queued_tasks () >= config.max_queued_notifications)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown);
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped; });
if (stopped)
{
@ -231,7 +234,7 @@ void nano::block_processor::run ()
if (log_interval.elapsed (15s))
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}
@ -242,7 +245,7 @@ void nano::block_processor::run ()
// Queue notifications to be dispatched in the background
workers.post ([this, processed = std::move (processed)] () mutable {
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify);
stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::notify);
// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
@ -304,7 +307,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
lock.unlock ();
auto transaction = node.ledger.tx_begin_write (nano::store::writer::blockprocessor);
auto transaction = ledger.tx_begin_write (nano::store::writer::blockprocessor);
nano::timer<std::chrono::milliseconds> timer;
timer.start ();
@ -335,7 +338,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
if (number_of_blocks_processed != 0 && timer.stop () > std::chrono::milliseconds (100))
{
node.logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ());
logger.debug (nano::log::type::blockprocessor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ());
}
return processed;
@ -345,12 +348,12 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction
{
auto block = context.block;
auto const hash = block->hash ();
nano::block_status result = node.ledger.process (transaction_a, block);
nano::block_status result = ledger.process (transaction_a, block);
node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result));
node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source));
stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result));
stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source));
node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed,
logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed,
nano::log::arg{ "result", result },
nano::log::arg{ "source", context.source },
nano::log::arg{ "arrival", nano::log::microseconds (context.arrival) },
@ -361,40 +364,41 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction
{
case nano::block_status::progress:
{
queue_unchecked (transaction_a, hash);
/* For send blocks check epoch open unchecked (gap pending).
For state blocks check only send subtype and only if block epoch is not last epoch.
If epoch is last, then pending entry shouldn't trigger same epoch open block for destination account. */
unchecked.trigger (hash);
/*
* For send blocks check epoch open unchecked (gap pending).
* For state blocks check only send subtype and only if block epoch is not last epoch.
* If epoch is last, then pending entry shouldn't trigger same epoch open block for destination account.
*/
if (block->type () == nano::block_type::send || (block->type () == nano::block_type::state && block->is_send () && std::underlying_type_t<nano::epoch> (block->sideband ().details.epoch) < std::underlying_type_t<nano::epoch> (nano::epoch::max)))
{
/* block->destination () for legacy send blocks
block->link () for state blocks (send subtype) */
queue_unchecked (transaction_a, block->destination ());
unchecked.trigger (block->destination ());
}
break;
}
case nano::block_status::gap_previous:
{
node.unchecked.put (block->previous (), block);
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous);
unchecked.put (block->previous (), block);
stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_previous);
break;
}
case nano::block_status::gap_source:
{
release_assert (block->source_field () || block->link_field ());
node.unchecked.put (block->source_field ().value_or (block->link_field ().value_or (0).as_block_hash ()), block);
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source);
unchecked.put (block->source_field ().value_or (block->link_field ().value_or (0).as_block_hash ()), block);
stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source);
break;
}
case nano::block_status::gap_epoch_open_pending:
{
node.unchecked.put (block->account_field ().value_or (0), block); // Specific unchecked key starting with epoch open block account public key
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source);
unchecked.put (block->account_field ().value_or (0), block); // Specific unchecked key starting with epoch open block account public key
stats.inc (nano::stat::type::ledger, nano::stat::detail::gap_source);
break;
}
case nano::block_status::old:
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::old);
stats.inc (nano::stat::type::ledger, nano::stat::detail::old);
break;
}
case nano::block_status::bad_signature:
@ -411,7 +415,7 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction
}
case nano::block_status::fork:
{
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::fork);
stats.inc (nano::stat::type::ledger, nano::stat::detail::fork);
break;
}
case nano::block_status::opened_burn_account:
@ -438,11 +442,6 @@ nano::block_status nano::block_processor::process_one (secure::write_transaction
return result;
}
void nano::block_processor::queue_unchecked (secure::write_transaction const & transaction_a, nano::hash_or_account const & hash_or_account_a)
{
node.unchecked.trigger (hash_or_account_a);
}
nano::container_info nano::block_processor::container_info () const
{
nano::lock_guard<nano::mutex> guard{ mutex };

View file

@ -82,7 +82,7 @@ public: // Context
};
public:
explicit block_processor (nano::node &);
block_processor (nano::node_config const &, nano::ledger &, nano::unchecked_map &, nano::stats &, nano::logger &);
~block_processor ();
void start ();
@ -105,23 +105,28 @@ public: // Events
// The batch observer feeds the processed observer
nano::observer_set<nano::block_status const &, context const &> block_processed;
nano::observer_set<processed_batch_t const &> batch_processed;
nano::observer_set<std::shared_ptr<nano::block> const &> rolled_back;
// Rolled back blocks <rolled back block, root of rollback>
nano::observer_set<std::shared_ptr<nano::block> const &, nano::qualified_root const &> rolled_back;
private: // Dependencies
block_processor_config const & config;
nano::network_params const & network_params;
nano::ledger & ledger;
nano::unchecked_map & unchecked;
nano::stats & stats;
nano::logger & logger;
private:
void run ();
// Roll back block in the ledger that conflicts with 'block'
void rollback_competitor (secure::write_transaction const &, nano::block const & block);
nano::block_status process_one (secure::write_transaction const &, context const &, bool forced = false);
void queue_unchecked (secure::write_transaction const &, nano::hash_or_account const &);
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
std::deque<context> next_batch (size_t max_count);
context next ();
bool add_impl (context, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
private: // Dependencies
block_processor_config const & config;
nano::node & node;
private:
nano::fair_queue<context, nano::block_source> queue;

View file

@ -1,6 +1,8 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
@ -8,9 +10,10 @@
#include <nano/store/component.hpp>
#include <nano/store/write_queue.hpp>
nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::stats & stats_a, nano::logger & logger_a) :
nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::block_processor & block_processor_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
ledger{ ledger_a },
block_processor{ block_processor_a },
stats{ stats_a },
logger{ logger_a },
workers{ 1, nano::thread_role::name::confirmation_height_notifications }
@ -21,6 +24,28 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na
cemented_observers.notify (context.block);
}
});
// Requeue blocks that failed to cement immediately due to missing ledger blocks
block_processor.batch_processed.add ([this] (auto const & batch) {
bool should_notify = false;
{
std::lock_guard lock{ mutex };
for (auto const & [result, context] : batch)
{
if (auto it = deferred.get<tag_hash> ().find (context.block->hash ()); it != deferred.get<tag_hash> ().end ())
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::requeued);
set.push_back (*it);
deferred.get<tag_hash> ().erase (it);
should_notify = true;
}
}
}
if (should_notify)
{
condition.notify_all ();
}
});
}
nano::confirming_set::~confirming_set ()
@ -81,13 +106,14 @@ void nano::confirming_set::stop ()
bool nano::confirming_set::contains (nano::block_hash const & hash) const
{
std::lock_guard lock{ mutex };
return set.get<tag_hash> ().contains (hash) || current.contains (hash);
return set.get<tag_hash> ().contains (hash) || deferred.get<tag_hash> ().contains (hash) || current.contains (hash);
}
std::size_t nano::confirming_set::size () const
{
// Do not report deferred blocks, as they are not currently being processed (and might never be requeued)
std::lock_guard lock{ mutex };
return set.size ();
return set.size () + current.size ();
}
void nano::confirming_set::run ()
@ -97,6 +123,9 @@ void nano::confirming_set::run ()
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::loop);
cleanup (lock);
debug_assert (lock.owns_lock ());
if (!set.empty ())
{
run_batch (lock);
@ -137,9 +166,9 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
// Keep track of the blocks we're currently cementing, so that the .contains (...) check is accurate
debug_assert (current.empty ());
for (auto const & [hash, election] : batch)
for (auto const & entry : batch)
{
current.insert (hash);
current.insert (entry.hash);
}
lock.unlock ();
@ -180,8 +209,11 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
{
auto transaction = ledger.tx_begin_write (nano::store::writer::confirmation_height);
for (auto const & [hash, election] : batch)
for (auto const & entry : batch)
{
auto const & hash = entry.hash;
auto const & election = entry.election;
size_t cemented_count = 0;
bool success = false;
do
@ -236,6 +268,12 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cementing_failed);
logger.debug (nano::log::type::confirming_set, "Failed to cement block: {}", hash.to_string ());
// Requeue failed blocks for processing later
// Add them to the deferred set while still holding the exclusive database write transaction to avoid block processor races
lock.lock ();
deferred.push_back (entry);
lock.unlock ();
}
}
}
@ -245,18 +283,59 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
already_cemented.notify (already);
// Clear current set only after the transaction is committed
lock.lock ();
current.clear ();
lock.unlock ();
}
void nano::confirming_set::cleanup (std::unique_lock<std::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
auto const cutoff = std::chrono::steady_clock::now () - config.deferred_age_cutoff;
std::deque<entry> evicted;
auto should_evict = [&] (entry const & entry) {
return entry.timestamp < cutoff;
};
// Iterate in sequenced (insertion) order
for (auto it = deferred.begin (), end = deferred.end (); it != end;)
{
if (should_evict (*it) || deferred.size () > config.max_deferred)
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::evicted);
debug_assert (ledger.any.block_exists (ledger.tx_begin_read (), it->hash));
evicted.push_back (*it);
it = deferred.erase (it);
}
else
{
break; // Entries are sequenced, so we can stop here and avoid unnecessary iteration
}
}
// Notify about evicted blocks so that other components can perform necessary cleanup
if (!evicted.empty ())
{
lock.unlock ();
for (auto const & entry : evicted)
{
cementing_failed.notify (entry.hash);
}
lock.lock ();
}
}
nano::container_info nano::confirming_set::container_info () const
{
std::lock_guard guard{ mutex };
nano::container_info info;
info.put ("set", set);
info.put ("notifications", workers.queued_tasks ());
info.put ("deferred", deferred);
info.add ("workers", workers.container_info ());
return info;
}

View file

@ -36,6 +36,11 @@ public:
/** Maximum number of dependent blocks to be stored in memory during processing */
size_t max_blocks{ 128 * 1024 };
size_t max_queued_notifications{ 8 };
/** Maximum number of failed blocks to wait for requeuing */
size_t max_deferred{ 16 * 1024 };
/** Max age of deferred blocks before they are dropped */
std::chrono::seconds deferred_age_cutoff{ 15min };
};
/**
@ -47,7 +52,7 @@ class confirming_set final
friend class confirmation_height_pruned_source_Test;
public:
confirming_set (confirming_set_config const &, nano::ledger &, nano::stats &, nano::logger &);
confirming_set (confirming_set_config const &, nano::ledger &, nano::block_processor &, nano::stats &, nano::logger &);
~confirming_set ();
void start ();
@ -71,12 +76,14 @@ public: // Events
nano::observer_set<std::deque<context> const &> batch_cemented;
nano::observer_set<std::deque<nano::block_hash> const &> already_cemented;
nano::observer_set<nano::block_hash> cementing_failed;
nano::observer_set<std::shared_ptr<nano::block>> cemented_observers;
private: // Dependencies
confirming_set_config const & config;
nano::ledger & ledger;
nano::block_processor & block_processor;
nano::stats & stats;
nano::logger & logger;
@ -85,11 +92,13 @@ private:
{
nano::block_hash hash;
std::shared_ptr<nano::election> election;
std::chrono::steady_clock::time_point timestamp{ std::chrono::steady_clock::now () };
};
void run ();
void run_batch (std::unique_lock<std::mutex> &);
std::deque<entry> next_batch (size_t max_count);
void cleanup (std::unique_lock<std::mutex> &);
private:
// clang-format off
@ -104,7 +113,11 @@ private:
>>;
// clang-format on
// Blocks that are ready to be cemented
ordered_entries set;
// Blocks that could not be cemented immediately (e.g. waiting for rollbacks to complete)
ordered_entries deferred;
// Blocks that are being cemented in the current batch
std::unordered_set<nano::block_hash> current;
std::atomic<bool> stopped{ false };

View file

@ -55,19 +55,28 @@ void nano::election::confirm_once (nano::unique_lock<nano::mutex> & lock)
node.active.recently_confirmed.put (qualified_root, status_l.winner->hash ());
auto const extended_status = current_status_locked ();
node.stats.inc (nano::stat::type::election, nano::stat::detail::confirm_once);
node.logger.trace (nano::log::type::election, nano::log::detail::election_confirmed,
nano::log::arg{ "id", id },
nano::log::arg{ "qualified_root", qualified_root },
nano::log::arg{ "status", current_status_locked () });
nano::log::arg{ "status", extended_status });
node.logger.debug (nano::log::type::election, "Election confirmed with winner: {} (behavior: {}, state: {}, voters: {}, blocks: {}, duration: {}ms, confirmation requests: {})",
status_l.winner->hash ().to_string (),
to_string (behavior_m),
to_string (state_m),
extended_status.status.voter_count,
extended_status.status.block_count,
extended_status.status.election_duration.count (),
extended_status.status.confirmation_request_count);
node.confirming_set.add (status_l.winner->hash (), shared_from_this ());
lock.unlock ();
node.election_workers.post ([this_l = shared_from_this (), status_l, confirmation_action_l = confirmation_action] () {
// This is necessary if the winner of the election is one of the forks.
// In that case the winning block is not yet in the ledger and cementing needs to wait for rollbacks to complete.
this_l->node.process_confirmed (status_l.winner->hash (), this_l);
node.election_workers.post ([status_l, confirmation_action_l = confirmation_action] () {
if (confirmation_action_l)
{
confirmation_action_l (status_l.winner);

View file

@ -29,6 +29,8 @@ class recently_cemented_cache;
class recently_confirmed_cache;
class rep_crawler;
class rep_tiers;
class telemetry;
class unchecked_map;
class stats;
class vote_cache;
enum class vote_code;

View file

@ -56,7 +56,7 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_
}
});
block_processor.rolled_back.add ([this] (auto const & block) {
block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
nano::lock_guard<nano::mutex> guard{ mutex };
auto erased = local_blocks.get<tag_hash> ().erase (block->hash ());
stats.add (nano::stat::type::local_block_broadcaster, nano::stat::detail::rollback, erased);

View file

@ -125,8 +125,9 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
application_path (application_path_a),
port_mapping_impl{ std::make_unique<nano::port_mapping> (*this) },
port_mapping{ *port_mapping_impl },
block_processor (*this),
confirming_set_impl{ std::make_unique<nano::confirming_set> (config.confirming_set, ledger, stats, logger) },
block_processor_impl{ std::make_unique<nano::block_processor> (config, ledger, unchecked, stats, logger) },
block_processor{ *block_processor_impl },
confirming_set_impl{ std::make_unique<nano::confirming_set> (config.confirming_set, ledger, block_processor, stats, logger) },
confirming_set{ *confirming_set_impl },
active_impl{ std::make_unique<nano::active_elections> (*this, confirming_set, block_processor) },
active{ *active_impl },
@ -175,10 +176,6 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
process_live_dispatcher.connect (block_processor);
unchecked.satisfied.add ([this] (nano::unchecked_info const & info) {
block_processor.add (info.block, nano::block_source::unchecked);
});
vote_cache.rep_weight_query = [this] (nano::account const & rep) {
return ledger.weight (rep);
};
@ -198,6 +195,16 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
}
});
// Do some cleanup due to this block never being processed by confirmation height processor
confirming_set.cementing_failed.add ([this] (auto const & hash) {
active.recently_confirmed.erase (hash);
});
// Do some cleanup of rolled back blocks
block_processor.rolled_back.add ([this] (auto const & block, auto const & rollback_root) {
history.erase (block->root ());
});
if (!init_error ())
{
wallets.observer = [this] (bool active) {
@ -1057,39 +1064,6 @@ void nano::node::ongoing_online_weight_calculation ()
ongoing_online_weight_calculation_queue ();
}
// TODO: Replace this with a queue of some sort. Blocks submitted here could be in a limbo for a while: neither part of an active election nor cemented
void nano::node::process_confirmed (nano::block_hash hash, std::shared_ptr<nano::election> election, uint64_t iteration)
{
stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::initiate);
// Limit the maximum number of iterations to avoid getting stuck
uint64_t const max_iterations = (config.block_processor_batch_max_time / network_params.node.process_confirmed_interval) * 4;
if (auto block = ledger.any.block_get (ledger.tx_begin_read (), hash))
{
stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::done);
logger.trace (nano::log::type::node, nano::log::detail::process_confirmed, nano::log::arg{ "block", block });
confirming_set.add (block->hash (), election);
}
else if (iteration < max_iterations)
{
stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::retry);
// Try again later
election_workers.post_delayed (network_params.node.process_confirmed_interval, [this, hash, election, iteration] () {
process_confirmed (hash, election, iteration + 1);
});
}
else
{
stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::timeout);
// Do some cleanup due to this block never being processed by confirmation height processor
active.recently_confirmed.erase (hash);
}
}
std::shared_ptr<nano::node> nano::node::shared ()
{
return shared_from_this ();

View file

@ -5,7 +5,6 @@
#include <nano/lib/logging.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/work.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/distributed_work_factory.hpp>
#include <nano/node/epoch_upgrader.hpp>
#include <nano/node/fwd.hpp>
@ -87,7 +86,6 @@ public:
void keepalive (std::string const &, uint16_t);
int store_version ();
void inbound (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
void process_confirmed (nano::block_hash, std::shared_ptr<nano::election> = nullptr, uint64_t iteration = 0);
void process_active (std::shared_ptr<nano::block> const &);
std::optional<nano::block_status> process_local (std::shared_ptr<nano::block> const &);
void process_local_async (std::shared_ptr<nano::block> const &);
@ -177,7 +175,8 @@ public:
nano::node_observers observers;
std::unique_ptr<nano::port_mapping> port_mapping_impl;
nano::port_mapping & port_mapping;
nano::block_processor block_processor;
std::unique_ptr<nano::block_processor> block_processor_impl;
nano::block_processor & block_processor;
std::unique_ptr<nano::confirming_set> confirming_set_impl;
nano::confirming_set & confirming_set;
std::unique_ptr<nano::active_elections> active_impl;
@ -229,20 +228,14 @@ public:
std::atomic<bool> stopped{ false };
static double constexpr price_max = 16.0;
static double constexpr free_cutoff = 1024.0;
// For tests only
public: // For tests only
unsigned node_seq;
// For tests only
std::optional<uint64_t> work_generate_blocking (nano::block &);
// For tests only
std::optional<uint64_t> work_generate_blocking (nano::root const &, uint64_t);
// For tests only
std::optional<uint64_t> work_generate_blocking (nano::root const &);
public: // Testing convenience functions
/**
Creates a new write transaction and inserts `block' and returns result
Transaction is comitted before function return
*/
[[nodiscard]] nano::block_status process (std::shared_ptr<nano::block> block);
[[nodiscard]] nano::block_status process (secure::write_transaction const &, std::shared_ptr<nano::block> block);
nano::block_hash latest (nano::account const &);

View file

@ -1121,6 +1121,7 @@ TEST (confirmation_height, many_accounts_send_receive_self)
// as opposed to active transactions which implicitly calls confirmation height processor.
TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
{
nano::test::system system;
if (nano::rocksdb_config::using_rocksdb_in_tests ())
{
// Don't test this in rocksdb mode
@ -1139,8 +1140,12 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
nano::block_hash block_hash_being_processed{ 0 };
nano::store::write_queue write_queue;
nano::node_config node_config;
nano::unchecked_map unchecked{ 0, stats, false };
nano::block_processor block_processor{ node_config, ledger, unchecked, stats, logger };
nano::confirming_set_config confirming_set_config{};
nano::confirming_set confirming_set{ confirming_set_config, ledger, stats, logger };
nano::confirming_set confirming_set{ confirming_set_config, ledger, block_processor, stats, logger };
auto const num_accounts = 100000;
@ -1149,7 +1154,6 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
std::vector<std::shared_ptr<nano::open_block>> open_blocks;
nano::block_builder builder;
nano::test::system system;
{
auto transaction = ledger.tx_begin_write ();