Move block in to the block_processor::context object.

This commit is contained in:
Colin LeMahieu 2024-02-13 17:23:24 +00:00
commit 11552cb16f
No known key found for this signature in database
GPG key ID: 43708520C8DFB938
7 changed files with 45 additions and 50 deletions

View file

@ -34,11 +34,11 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi
});
// Notify elections about alternative (forked) blocks
block_processor.block_processed.add ([this] (auto const & result, auto const & block, auto const & context) {
block_processor.block_processed.add ([this] (auto const & result, auto const & context) {
switch (result.code)
{
case nano::process_result::fork:
publish (block);
publish (context.block);
break;
default:
break;

View file

@ -14,11 +14,11 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor)
{
return;
}
block_processor.block_processed.add ([this] (auto const & result, auto const & block, auto const & context) {
block_processor.block_processed.add ([this] (auto const & result, auto const & context) {
switch (result.code)
{
case nano::process_result::progress:
observe (block, context);
observe (context);
break;
default:
break;
@ -26,8 +26,9 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor)
});
}
void nano::block_broadcast::observe (std::shared_ptr<nano::block> const & block, nano::block_processor::context const & context)
void nano::block_broadcast::observe (nano::block_processor::context const & context)
{
auto const & block = context.block;
if (context.source == nano::block_source::local)
{
// Block created on this node

View file

@ -20,7 +20,7 @@ public:
private:
// Block_processor observer
void observe (std::shared_ptr<nano::block> const & block, nano::block_processor::context const &);
void observe (nano::block_processor::context const &);
nano::network & network;
bool enabled;

View file

@ -12,7 +12,8 @@
* block_processor::context
*/
nano::block_processor::context::context (nano::block_source source_a) :
nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a) :
block{ block },
source{ source_a }
{
debug_assert (source != nano::block_source::unknown);
@ -39,9 +40,9 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
for (auto const & [result, block, context] : items)
for (auto const & [result, context] : items)
{
block_processed.notify (result, block, context);
block_processed.notify (result, context);
}
});
processing_thread = std::thread ([this] () {
@ -92,7 +93,7 @@ void nano::block_processor::add (std::shared_ptr<nano::block> const & block, blo
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source));
add_impl (block, context{ source });
add_impl (context{ block, source });
}
std::optional<nano::process_return> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
@ -100,9 +101,9 @@ std::optional<nano::process_return> nano::block_processor::add_blocking (std::sh
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source));
context ctx{ source };
context ctx{ block, source };
auto future = ctx.get_future ();
add_impl (block, std::move (ctx));
add_impl (std::move (ctx));
try
{
@ -129,7 +130,7 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
forced.emplace_back (entry{ block_a, context{ block_source::forced } });
forced.emplace_back (context{ block_a, block_source::forced });
}
condition.notify_all ();
}
@ -180,7 +181,7 @@ void nano::block_processor::process_blocks ()
debug_assert (!lock.owns_lock ());
// Set results for futures when not holding the lock
for (auto & [result, block, context] : processed)
for (auto & [result, context] : processed)
{
context.set_result (result);
}
@ -222,33 +223,33 @@ bool nano::block_processor::have_blocks ()
return have_blocks_ready ();
}
void nano::block_processor::add_impl (std::shared_ptr<nano::block> const & block, context ctx)
void nano::block_processor::add_impl (context ctx)
{
release_assert (ctx.source != nano::block_source::forced);
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (entry{ block, std::move (ctx) });
blocks.emplace_back (std::move (ctx));
}
condition.notify_all ();
}
auto nano::block_processor::next () -> entry
auto nano::block_processor::next () -> context
{
debug_assert (!mutex.try_lock ());
debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next
if (!forced.empty ())
{
entry entry = std::move (forced.front ());
release_assert (entry.ctx.source == nano::block_source::forced);
auto entry = std::move (forced.front ());
release_assert (entry.source == nano::block_source::forced);
forced.pop_front ();
return entry;
}
if (!blocks.empty ())
{
entry entry = std::move (blocks.front ());
release_assert (entry.ctx.source != nano::block_source::forced);
auto entry = std::move (blocks.front ());
release_assert (entry.source != nano::block_source::forced);
blocks.pop_front ();
return entry;
}
@ -281,10 +282,8 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ());
}
entry entry = next ();
context ctx = std::move (entry.ctx);
auto const block = entry.block;
auto const hash = block->hash ();
auto ctx = next ();
auto const hash = ctx.block->hash ();
bool const force = ctx.source == nano::block_source::forced;
lock_a.unlock ();
@ -292,13 +291,13 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
if (force)
{
number_of_forced_processed++;
rollback_competitor (transaction, *block);
rollback_competitor (transaction, *ctx.block);
}
number_of_blocks_processed++;
auto result = process_one (transaction, block, ctx, force);
processed.emplace_back (result, block, std::move (ctx));
auto result = process_one (transaction, ctx, force);
processed.emplace_back (result, std::move (ctx));
lock_a.lock ();
}
@ -313,8 +312,9 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
return processed;
}
nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, std::shared_ptr<nano::block> block, context const & context, bool const forced_a)
nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, context const & context, bool const forced_a)
{
auto const & block = context.block;
auto const hash = block->hash ();
nano::process_return result = node.ledger.process (transaction_a, *block);

View file

@ -44,8 +44,9 @@ public: // Context
class context
{
public:
explicit context (block_source);
context (std::shared_ptr<block> block, block_source source);
std::shared_ptr<block> block;
block_source const source;
std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () };
@ -60,13 +61,6 @@ public: // Context
friend class block_processor;
};
private:
struct entry
{
std::shared_ptr<nano::block> block;
block_processor::context ctx;
};
public:
block_processor (nano::node &, nano::write_database_queue &);
@ -85,21 +79,21 @@ public:
std::atomic<bool> flushing{ false };
public: // Events
using processed_t = std::tuple<nano::process_return, std::shared_ptr<nano::block>, context>;
using processed_t = std::tuple<nano::process_return, context>;
using processed_batch_t = std::deque<processed_t>;
// The batch observer feeds the processed observer
nano::observer_set<nano::process_return const &, std::shared_ptr<nano::block> const &, context const &> block_processed;
nano::observer_set<nano::process_return const &, context const &> block_processed;
nano::observer_set<processed_batch_t const &> batch_processed;
private:
// Roll back block in the ledger that conflicts with 'block'
void rollback_competitor (store::write_transaction const &, nano::block const & block);
nano::process_return process_one (store::write_transaction const &, std::shared_ptr<nano::block> block, context const &, bool forced = false);
nano::process_return process_one (store::write_transaction const &, context const &, bool forced = false);
void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &);
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
entry next ();
void add_impl (std::shared_ptr<nano::block> const & block, context);
context next ();
void add_impl (context);
private: // Dependencies
nano::node & node;
@ -109,8 +103,8 @@ private:
bool stopped{ false };
bool active{ false };
std::deque<entry> blocks;
std::deque<entry> forced;
std::deque<context> blocks;
std::deque<context> forced;
std::chrono::steady_clock::time_point next_log;
nano::condition_variable condition;

View file

@ -34,10 +34,10 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano:
nano::lock_guard<nano::mutex> lock{ mutex };
auto transaction = ledger.store.tx_begin_read ();
for (auto const & [result, block, context] : batch)
for (auto const & [result, context] : batch)
{
debug_assert (block != nullptr);
inspect (transaction, result, *block);
debug_assert (context.block != nullptr);
inspect (transaction, result, *context.block);
}
}

View file

@ -20,10 +20,10 @@ void nano::process_live_dispatcher::connect (nano::block_processor & block_proce
{
block_processor.batch_processed.add ([this] (auto const & batch) {
auto const transaction = ledger.store.tx_begin_read ();
for (auto const & [result, block, context] : batch)
for (auto const & [result, context] : batch)
{
debug_assert (block != nullptr);
inspect (result, *block, transaction);
debug_assert (context.block != nullptr);
inspect (result, *context.block, transaction);
}
});
}