Merge pull request #4424 from pwojcikdev/blockprocessor-context

Blockprocessor context
This commit is contained in:
Piotr Wójcik 2024-02-14 10:30:41 +01:00 committed by GitHub
commit 6c0c27a871
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 260 additions and 388 deletions

View file

@ -190,6 +190,12 @@ TEST (active_transactions, keep_local)
auto const send4 = wallet.send_action (nano::dev::genesis_key.pub, key4.pub, node.config.receive_minimum.number ());
auto const send5 = wallet.send_action (nano::dev::genesis_key.pub, key5.pub, node.config.receive_minimum.number ());
auto const send6 = wallet.send_action (nano::dev::genesis_key.pub, key6.pub, node.config.receive_minimum.number ());
ASSERT_NE (nullptr, send1);
ASSERT_NE (nullptr, send2);
ASSERT_NE (nullptr, send3);
ASSERT_NE (nullptr, send4);
ASSERT_NE (nullptr, send5);
ASSERT_NE (nullptr, send6);
// force-confirm blocks
for (auto const & block : { send1, send2, send3, send4, send5, send6 })

View file

@ -2111,53 +2111,6 @@ TEST (node, block_confirm)
ASSERT_TIMELY_EQ (10s, node1.active.recently_cemented.list ().size (), 1);
}
TEST (node, block_arrival)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
ASSERT_EQ (0, node.block_arrival.arrival.size ());
nano::block_hash hash1 (1);
node.block_arrival.add (hash1);
ASSERT_EQ (1, node.block_arrival.arrival.size ());
node.block_arrival.add (hash1);
ASSERT_EQ (1, node.block_arrival.arrival.size ());
nano::block_hash hash2 (2);
node.block_arrival.add (hash2);
ASSERT_EQ (2, node.block_arrival.arrival.size ());
}
TEST (node, block_arrival_size)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
auto time (std::chrono::steady_clock::now () - nano::block_arrival::arrival_time_min - std::chrono::seconds (5));
nano::block_hash hash (0);
for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i)
{
node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash });
++hash.qwords[0];
}
ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ());
node.block_arrival.recent (0);
ASSERT_EQ (nano::block_arrival::arrival_size_min, node.block_arrival.arrival.size ());
}
TEST (node, block_arrival_time)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
auto time (std::chrono::steady_clock::now ());
nano::block_hash hash (0);
for (auto i (0); i < nano::block_arrival::arrival_size_min * 2; ++i)
{
node.block_arrival.arrival.push_back (nano::block_arrival_info{ time, hash });
++hash.qwords[0];
}
ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ());
node.block_arrival.recent (0);
ASSERT_EQ (nano::block_arrival::arrival_size_min * 2, node.block_arrival.arrival.size ());
}
TEST (node, confirm_quorum)
{
nano::test::system system (1);
@ -2958,7 +2911,7 @@ TEST (node, block_processor_reject_state)
send1->signature.bytes[0] ^= 1;
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
node.process_active (send1);
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor, nano::stat::detail::bad_signature));
ASSERT_TIMELY_EQ (5s, 1, node.stats.count (nano::stat::type::blockprocessor_result, nano::stat::detail::bad_signature));
ASSERT_FALSE (node.ledger.block_or_pruned_exists (send1->hash ()));
auto send2 = builder.make_block ()
.account (nano::dev::genesis_key.pub)

View file

@ -251,8 +251,6 @@ char const * nano::mutex_identifier (mutexes mutex)
{
case mutexes::active:
return "active";
case mutexes::block_arrival:
return "block_arrival";
case mutexes::block_processor:
return "block_processor";
case mutexes::block_uniquer:

View file

@ -20,7 +20,6 @@ bool any_filters_registered ();
enum class mutexes
{
active,
block_arrival,
block_processor,
block_uniquer,
blockstore_cache,

View file

@ -36,6 +36,8 @@ enum class type : uint8_t
vote_cache,
hinting,
blockprocessor,
blockprocessor_source,
blockprocessor_result,
bootstrap_server,
active,
active_started,
@ -71,6 +73,7 @@ enum class detail : uint8_t
top,
none,
success,
unknown,
// processing queue
queue,
@ -110,6 +113,19 @@ enum class detail : uint8_t
representative_mismatch,
block_position,
// blockprocessor
process_blocking,
process_blocking_timeout,
force,
// block source
live,
bootstrap,
bootstrap_legacy,
unchecked,
local,
forced,
// message specific
not_a_type,
invalid,

View file

@ -20,12 +20,8 @@ add_library(
backlog_population.cpp
bandwidth_limiter.hpp
bandwidth_limiter.cpp
block_arrival.hpp
block_arrival.cpp
block_broadcast.cpp
block_broadcast.hpp
blocking_observer.cpp
blocking_observer.hpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp

View file

@ -34,11 +34,11 @@ nano::active_transactions::active_transactions (nano::node & node_a, nano::confi
});
// Notify elections about alternative (forked) blocks
block_processor.processed.add ([this] (auto const & result, auto const & block) {
block_processor.block_processed.add ([this] (auto const & result, auto const & context) {
switch (result.code)
{
case nano::process_result::fork:
publish (block);
publish (context.block);
break;
default:
break;

View file

@ -1,35 +0,0 @@
#include <nano/node/block_arrival.hpp>
bool nano::block_arrival::add (nano::block_hash const & hash_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto now (std::chrono::steady_clock::now ());
auto inserted (arrival.get<tag_sequence> ().emplace_back (nano::block_arrival_info{ now, hash_a }));
auto result (!inserted.second);
return result;
}
bool nano::block_arrival::recent (nano::block_hash const & hash_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto now (std::chrono::steady_clock::now ());
while (arrival.size () > arrival_size_min && arrival.get<tag_sequence> ().front ().arrival + arrival_time_min < now)
{
arrival.get<tag_sequence> ().pop_front ();
}
return arrival.get<tag_hash> ().find (hash_a) != arrival.get<tag_hash> ().end ();
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (block_arrival & block_arrival, std::string const & name)
{
std::size_t count = 0;
{
nano::lock_guard<nano::mutex> guard{ block_arrival.mutex };
count = block_arrival.arrival.size ();
}
auto sizeof_element = sizeof (decltype (block_arrival.arrival)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "arrival", count, sizeof_element }));
return composite;
}

View file

@ -1,49 +0,0 @@
#pragma once
#include <nano/secure/common.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
#include <chrono>
namespace nano
{
class block_arrival_info final
{
public:
std::chrono::steady_clock::time_point arrival;
nano::block_hash hash;
};
// This class tracks blocks that are probably live because they arrived in a UDP packet
// This gives a fairly reliable way to differentiate between blocks being inserted via bootstrap or new, live blocks.
class block_arrival final
{
public:
// Return `true' to indicated an error if the block has already been inserted
bool add (nano::block_hash const &);
bool recent (nano::block_hash const &);
// clang-format off
class tag_sequence {};
class tag_hash {};
boost::multi_index_container<nano::block_arrival_info,
boost::multi_index::indexed_by<
boost::multi_index::sequenced<boost::multi_index::tag<tag_sequence>>,
boost::multi_index::hashed_unique<boost::multi_index::tag<tag_hash>,
boost::multi_index::member<nano::block_arrival_info, nano::block_hash, &nano::block_arrival_info::hash>>>>
arrival;
// clang-format on
nano::mutex mutex{ mutex_identifier (mutexes::block_arrival) };
static std::size_t constexpr arrival_size_min = 8 * 1024;
static std::chrono::seconds constexpr arrival_time_min = std::chrono::seconds (300);
};
std::unique_ptr<container_info_component> collect_container_info (block_arrival & block_arrival, std::string const & name);
}

View file

@ -1,11 +1,9 @@
#include <nano/node/block_arrival.hpp>
#include <nano/node/block_broadcast.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/network.hpp>
nano::block_broadcast::block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled) :
nano::block_broadcast::block_broadcast (nano::network & network, bool enabled) :
network{ network },
block_arrival{ block_arrival },
enabled{ enabled }
{
}
@ -16,26 +14,22 @@ void nano::block_broadcast::connect (nano::block_processor & block_processor)
{
return;
}
block_processor.processed.add ([this] (auto const & result, auto const & block) {
block_processor.block_processed.add ([this] (auto const & result, auto const & context) {
switch (result.code)
{
case nano::process_result::progress:
observe (block);
observe (context);
break;
default:
break;
}
erase (block);
});
}
void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
void nano::block_broadcast::observe (nano::block_processor::context const & context)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto existing = local.find (block);
auto local_l = existing != local.end ();
lock.unlock ();
if (local_l)
auto const & block = context.block;
if (context.source == nano::block_source::local)
{
// Block created on this node
// Perform more agressive initial flooding
@ -43,7 +37,7 @@ void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
}
else
{
if (block_arrival.recent (block->hash ()))
if (context.source != nano::block_source::bootstrap && context.source != nano::block_source::bootstrap_legacy)
{
// Block arrived from realtime traffic, do normal gossip.
network.flood_block (block, nano::transport::buffer_drop_policy::limiter);
@ -55,23 +49,3 @@ void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
}
}
}
void nano::block_broadcast::set_local (std::shared_ptr<nano::block> block)
{
if (!enabled)
{
return;
}
nano::lock_guard<nano::mutex> lock{ mutex };
local.insert (block);
}
void nano::block_broadcast::erase (std::shared_ptr<nano::block> block)
{
if (!enabled)
{
return;
}
nano::lock_guard<nano::mutex> lock{ mutex };
local.erase (block);
}

View file

@ -1,34 +1,28 @@
#pragma once
#include <nano/lib/blocks.hpp>
#include <nano/node/blockprocessor.hpp>
#include <memory>
#include <unordered_set>
namespace nano
{
class block_arrival;
class block_processor;
class network;
// This class tracks blocks that originated from this node.
class block_broadcast
{
public:
block_broadcast (nano::network & network, nano::block_arrival & block_arrival, bool enabled = false);
block_broadcast (nano::network & network, bool enabled = false);
// Add batch_processed observer to block_processor if enabled
void connect (nano::block_processor & block_processor);
// Mark a block as originating locally
void set_local (std::shared_ptr<nano::block> block);
void erase (std::shared_ptr<nano::block> block);
private:
// Block_processor observer
void observe (std::shared_ptr<nano::block> block);
void observe (nano::block_processor::context const &);
nano::network & network;
nano::block_arrival & block_arrival;
std::unordered_set<std::shared_ptr<nano::block>> local; // Blocks originated on this node
nano::mutex mutex;
bool enabled;
};
}

View file

@ -1,62 +0,0 @@
#include <nano/node/blocking_observer.hpp>
#include <nano/node/blockprocessor.hpp>
void nano::blocking_observer::connect (nano::block_processor & block_processor)
{
block_processor.processed.add ([this] (auto const & result, auto const & block) {
observe (result, block);
});
}
void nano::blocking_observer::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;
auto discard = std::move (blocking);
// Signal broken promises outside lock
lock.unlock ();
discard.clear (); // ~promise future_error
}
void nano::blocking_observer::observe (nano::process_return const & result, std::shared_ptr<nano::block> block)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto existing = blocking.find (block);
if (existing != blocking.end ())
{
auto promise = std::move (existing->second);
blocking.erase (existing);
// Signal promise outside of lock
lock.unlock ();
promise.set_value (result);
}
}
std::future<nano::process_return> nano::blocking_observer::insert (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (stopped)
{
std::promise<nano::process_return> promise;
return promise.get_future (); // ~promise future_error
}
auto iterator = blocking.emplace (block, std::promise<nano::process_return>{});
return iterator->second.get_future ();
}
bool nano::blocking_observer::exists (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto existing = blocking.find (block);
return existing != blocking.end ();
}
void nano::blocking_observer::erase (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto existing = blocking.find (block);
if (existing != blocking.end ())
{
blocking.erase (existing);
}
}

View file

@ -1,32 +0,0 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/secure/common.hpp>
#include <future>
#include <memory>
#include <unordered_map>
namespace nano
{
class block;
class block_processor;
// Observer that facilitates a blocking call to block processing which is done asynchronosly by the block_processor
class blocking_observer
{
public:
void connect (nano::block_processor & block_processor);
// Stop the observer and trigger broken promise exceptions
void stop ();
// Block processor observer
void observe (nano::process_return const & result, std::shared_ptr<nano::block> block);
[[nodiscard]] std::future<nano::process_return> insert (std::shared_ptr<nano::block> block);
bool exists (std::shared_ptr<nano::block> block);
void erase (std::shared_ptr<nano::block> block);
private:
std::unordered_multimap<std::shared_ptr<nano::block>, std::promise<nano::process_return>> blocking;
bool stopped{ false };
nano::mutex mutex;
};
}

View file

@ -6,20 +6,45 @@
#include <boost/format.hpp>
#include <magic_enum.hpp>
/*
* block_processor::context
*/
nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a) :
block{ block },
source{ source_a }
{
debug_assert (source != nano::block_source::unknown);
}
auto nano::block_processor::context::get_future () -> std::future<result_t>
{
return promise.get_future ();
}
void nano::block_processor::context::set_result (result_t const & result)
{
promise.set_value (result);
}
/*
* block_processor
*/
nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) :
next_log (std::chrono::steady_clock::now ()),
node (node_a),
write_database_queue (write_database_queue_a)
write_database_queue (write_database_queue_a),
next_log (std::chrono::steady_clock::now ())
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
for (auto const & item : items)
for (auto const & [result, context] : items)
{
auto const & [result, block] = item;
processed.notify (result, block);
block_processed.notify (result, context);
}
});
blocking.connect (*this);
processing_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::block_processing);
this->process_blocks ();
@ -33,7 +58,6 @@ void nano::block_processor::stop ()
stopped = true;
}
condition.notify_all ();
blocking.stop ();
nano::join_or_pass (processing_thread);
}
@ -53,7 +77,7 @@ bool nano::block_processor::half_full ()
return size () >= node.flags.block_processor_full_size / 2;
}
void nano::block_processor::add (std::shared_ptr<nano::block> const & block)
void nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source)
{
if (full ())
{
@ -65,33 +89,50 @@ void nano::block_processor::add (std::shared_ptr<nano::block> const & block)
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work);
return;
}
add_impl (block);
return;
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source));
add_impl (context{ block, source });
}
std::optional<nano::process_return> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block)
std::optional<nano::process_return> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
{
auto future = blocking.insert (block);
add_impl (block);
condition.notify_all ();
std::optional<nano::process_return> result;
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source));
context ctx{ block, source };
auto future = ctx.get_future ();
add_impl (std::move (ctx));
try
{
auto status = future.wait_for (node.config.block_process_timeout);
debug_assert (status != std::future_status::deferred);
if (status == std::future_status::ready)
{
result = future.get ();
}
else
{
blocking.erase (block);
return future.get ();
}
}
catch (std::future_error const &)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process_blocking_timeout);
node.logger.error (nano::log::type::blockprocessor, "Timeout processing block: {}", block->hash ().to_string ());
}
return result;
return std::nullopt;
}
void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force);
node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ());
{
nano::lock_guard<nano::mutex> lock{ mutex };
forced.emplace_back (context{ block_a, block_source::forced });
}
condition.notify_all ();
}
void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block)
@ -126,15 +167,6 @@ void nano::block_processor::rollback_competitor (store::write_transaction const
}
}
void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
forced.push_back (block_a);
}
condition.notify_all ();
}
void nano::block_processor::process_blocks ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
@ -144,8 +176,18 @@ void nano::block_processor::process_blocks ()
{
active = true;
lock.unlock ();
auto processed = process_batch (lock);
debug_assert (!lock.owns_lock ());
// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
context.set_result (result);
}
batch_processed.notify (processed);
lock.lock ();
active = false;
}
@ -181,28 +223,57 @@ bool nano::block_processor::have_blocks ()
return have_blocks_ready ();
}
void nano::block_processor::add_impl (std::shared_ptr<nano::block> block)
void nano::block_processor::add_impl (context ctx)
{
release_assert (ctx.source != nano::block_source::forced);
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (block);
blocks.emplace_back (std::move (ctx));
}
condition.notify_all ();
}
auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a) -> std::deque<processed_t>
auto nano::block_processor::next () -> context
{
std::deque<processed_t> processed;
debug_assert (!mutex.try_lock ());
debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next
if (!forced.empty ())
{
auto entry = std::move (forced.front ());
release_assert (entry.source == nano::block_source::forced);
forced.pop_front ();
return entry;
}
if (!blocks.empty ())
{
auto entry = std::move (blocks.front ());
release_assert (entry.source != nano::block_source::forced);
blocks.pop_front ();
return entry;
}
release_assert (false, "next() called when no blocks are ready");
}
auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a) -> processed_batch_t
{
processed_batch_t processed;
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending }));
nano::timer<std::chrono::milliseconds> timer_l;
lock_a.lock ();
timer_l.start ();
// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
{
// TODO: Cleaner periodical logging
@ -211,33 +282,26 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ());
}
std::shared_ptr<nano::block> block;
nano::block_hash hash (0);
bool force (false);
if (forced.empty ())
{
block = blocks.front ();
blocks.pop_front ();
hash = block->hash ();
}
else
{
block = forced.front ();
forced.pop_front ();
hash = block->hash ();
force = true;
number_of_forced_processed++;
}
auto ctx = next ();
auto const hash = ctx.block->hash ();
bool const force = ctx.source == nano::block_source::forced;
lock_a.unlock ();
if (force)
{
rollback_competitor (transaction, *block);
number_of_forced_processed++;
rollback_competitor (transaction, *ctx.block);
}
number_of_blocks_processed++;
auto result = process_one (transaction, block, force);
processed.emplace_back (result, block);
auto result = process_one (transaction, ctx, force);
processed.emplace_back (result, std::move (ctx));
lock_a.lock ();
}
lock_a.unlock ();
if (number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100))
@ -248,15 +312,18 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
return processed;
}
nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, std::shared_ptr<nano::block> block, bool const forced_a)
nano::process_return nano::block_processor::process_one (store::write_transaction const & transaction_a, context const & context, bool const forced_a)
{
nano::process_return result;
auto hash (block->hash ());
result = node.ledger.process (transaction_a, *block);
auto const & block = context.block;
auto const hash = block->hash ();
nano::process_return result = node.ledger.process (transaction_a, *block);
node.stats.inc (nano::stat::type::blockprocessor, to_stat_detail (result.code));
node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result.code));
node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source));
node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed,
nano::log::arg{ "result", result.code },
nano::log::arg{ "source", context.source },
nano::log::arg{ "arrival", nano::log::microseconds (context.arrival) },
nano::log::arg{ "forced", forced_a },
nano::log::arg{ "block", block });
@ -361,3 +428,15 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (bl
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) }));
return composite;
}
std::string_view nano::to_string (nano::block_source source)
{
return magic_enum::enum_name (source);
}
nano::stat::detail nano::to_stat_detail (nano::block_source type)
{
auto value = magic_enum::enum_cast<nano::stat::detail> (magic_enum::enum_name (type));
debug_assert (value);
return value.value_or (nano::stat::detail{});
}

View file

@ -2,12 +2,12 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/node/blocking_observer.hpp>
#include <nano/secure/common.hpp>
#include <chrono>
#include <future>
#include <memory>
#include <optional>
#include <thread>
namespace nano::store
@ -20,20 +20,56 @@ namespace nano
class node;
class write_database_queue;
enum class block_source
{
unknown = 0,
live,
bootstrap,
bootstrap_legacy,
unchecked,
local,
forced,
};
std::string_view to_string (block_source);
nano::stat::detail to_stat_detail (block_source);
/**
* Processing blocks is a potentially long IO operation.
* This class isolates block insertion from other operations like servicing network operations
*/
class block_processor final
{
public: // Context
class context
{
public:
context (std::shared_ptr<nano::block> block, block_source source);
std::shared_ptr<nano::block> const block;
block_source const source;
std::chrono::steady_clock::time_point const arrival{ std::chrono::steady_clock::now () };
public:
using result_t = nano::process_return;
std::future<result_t> get_future ();
private:
void set_result (result_t const &);
std::promise<result_t> promise;
friend class block_processor;
};
public:
explicit block_processor (nano::node &, nano::write_database_queue &);
block_processor (nano::node &, nano::write_database_queue &);
void stop ();
std::size_t size ();
bool full ();
bool half_full ();
void add (std::shared_ptr<nano::block> const &);
std::optional<nano::process_return> add_blocking (std::shared_ptr<nano::block> const & block);
void add (std::shared_ptr<nano::block> const &, block_source = block_source::live);
std::optional<nano::process_return> add_blocking (std::shared_ptr<nano::block> const & block, block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();
bool have_blocks_ready ();
@ -43,34 +79,38 @@ public:
std::atomic<bool> flushing{ false };
public: // Events
using processed_t = std::pair<nano::process_return, std::shared_ptr<nano::block>>;
nano::observer_set<nano::process_return const &, std::shared_ptr<nano::block>> processed;
using processed_t = std::tuple<nano::process_return, context>;
using processed_batch_t = std::deque<processed_t>;
// The batch observer feeds the processed obsever
nano::observer_set<std::deque<processed_t> const &> batch_processed;
private:
blocking_observer blocking;
// The batch observer feeds the processed observer
nano::observer_set<nano::process_return const &, context const &> block_processed;
nano::observer_set<processed_batch_t const &> batch_processed;
private:
// Roll back block in the ledger that conflicts with 'block'
void rollback_competitor (store::write_transaction const & transaction, nano::block const & block);
nano::process_return process_one (store::write_transaction const &, std::shared_ptr<nano::block> block, bool const = false);
void rollback_competitor (store::write_transaction const &, nano::block const & block);
nano::process_return process_one (store::write_transaction const &, context const &, bool forced = false);
void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &);
std::deque<processed_t> process_batch (nano::unique_lock<nano::mutex> &);
void add_impl (std::shared_ptr<nano::block> block);
bool stopped{ false };
bool active{ false };
std::chrono::steady_clock::time_point next_log;
std::deque<std::shared_ptr<nano::block>> blocks;
std::deque<std::shared_ptr<nano::block>> forced;
nano::condition_variable condition;
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
context next ();
void add_impl (context);
private: // Dependencies
nano::node & node;
nano::write_database_queue & write_database_queue;
private:
bool stopped{ false };
bool active{ false };
std::deque<context> blocks;
std::deque<context> forced;
std::chrono::steady_clock::time_point next_log;
nano::condition_variable condition;
nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
std::thread processing_thread;
friend std::unique_ptr<container_info_component> collect_container_info (block_processor & block_processor, std::string const & name);
};
std::unique_ptr<nano::container_info_component> collect_container_info (block_processor & block_processor, std::string const & name);
}

View file

@ -133,7 +133,7 @@ bool nano::bootstrap_attempt::process_block (std::shared_ptr<nano::block> const
}
else
{
node_l->block_processor.add (block_a);
node_l->block_processor.add (block_a, nano::block_source::bootstrap_legacy);
}
return stop_pull;
}

View file

@ -311,7 +311,7 @@ bool nano::bootstrap_attempt_lazy::process_block_lazy (std::shared_ptr<nano::blo
}
lazy_block_state_backlog_check (block_a, hash);
lock.unlock ();
node->block_processor.add (block_a);
node->block_processor.add (block_a, nano::block_source::bootstrap_legacy);
}
// Force drop lazy bootstrap connection for long bulk_pull
if (pull_blocks_processed > max_blocks)

View file

@ -34,11 +34,10 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano:
nano::lock_guard<nano::mutex> lock{ mutex };
auto transaction = ledger.store.tx_begin_read ();
for (auto const & [result, block] : batch)
for (auto const & [result, context] : batch)
{
debug_assert (block != nullptr);
inspect (transaction, result, *block);
debug_assert (context.block != nullptr);
inspect (transaction, result, *context.block);
}
}
@ -388,7 +387,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::bloc
for (auto & block : response.blocks)
{
block_processor.add (block);
block_processor.add (block, nano::block_source::bootstrap);
}
nano::lock_guard<nano::mutex> lock{ mutex };
throttle.add (true);

View file

@ -1214,7 +1214,6 @@ void nano::json_handler::block_confirm ()
nano::election_status status{ block_l, 0, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), 0, 1, 0, nano::election_status_type::active_confirmation_height };
node.active.recently_cemented.put (status);
// Trigger callback for confirmed block
node.block_arrival.add (hash);
auto account (node.ledger.account (transaction, hash));
bool error_or_pruned (false);
auto amount (node.ledger.amount_safe (transaction, hash, error_or_pruned));

View file

@ -197,7 +197,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
epoch_upgrader{ *this, ledger, store, network_params, logger },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq),
block_broadcast{ network, block_arrival, !flags.disable_block_processor_republishing },
block_broadcast{ network, !flags.disable_block_processor_republishing },
process_live_dispatcher{ ledger, scheduler.priority, vote_cache, websocket }
{
logger.debug (nano::log::type::node, "Constructing node...");
@ -206,7 +206,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
process_live_dispatcher.connect (block_processor);
unchecked.satisfied.add ([this] (nano::unchecked_info const & info) {
this->block_processor.add (info.block);
block_processor.add (info.block, nano::block_source::unchecked);
});
vote_cache.rep_weight_query = [this] (nano::account const & rep) {
@ -241,7 +241,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
{
observers.blocks.add ([this] (nano::election_status const & status_a, std::vector<nano::vote_with_weight_info> const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) {
auto block_a (status_a.winner);
if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height) && this->block_arrival.recent (block_a->hash ()))
if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height))
{
auto node_l (shared_from_this ());
background ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () {
@ -538,7 +538,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.vote_processor, "vote_processor"));
composite->add_component (collect_container_info (node.rep_crawler, "rep_crawler"));
composite->add_component (collect_container_info (node.block_processor, "block_processor"));
composite->add_component (collect_container_info (node.block_arrival, "block_arrival"));
composite->add_component (collect_container_info (node.online_reps, "online_reps"));
composite->add_component (collect_container_info (node.history, "history"));
composite->add_component (node.block_uniquer.collect_container_info ("block_uniquer"));
@ -557,7 +556,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
void nano::node::process_active (std::shared_ptr<nano::block> const & incoming)
{
block_arrival.add (incoming->hash ());
block_processor.add (incoming);
}
@ -574,18 +572,12 @@ nano::process_return nano::node::process (nano::block & block)
std::optional<nano::process_return> nano::node::process_local (std::shared_ptr<nano::block> const & block_a)
{
// Add block hash as recently arrived to trigger automatic rebroadcast and election
block_arrival.add (block_a->hash ());
block_broadcast.set_local (block_a);
return block_processor.add_blocking (block_a);
return block_processor.add_blocking (block_a, nano::block_source::local);
}
void nano::node::process_local_async (std::shared_ptr<nano::block> const & block_a)
{
// Add block hash as recently arrived to trigger automatic rebroadcast and election
block_arrival.add (block_a->hash ());
// Set current time to trigger automatic rebroadcast and election
block_processor.add (block_a);
block_processor.add (block_a, nano::block_source::local);
}
void nano::node::start ()

View file

@ -8,7 +8,6 @@
#include <nano/node/active_transactions.hpp>
#include <nano/node/backlog_population.hpp>
#include <nano/node/bandwidth_limiter.hpp>
#include <nano/node/block_arrival.hpp>
#include <nano/node/block_broadcast.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
@ -168,7 +167,6 @@ public:
nano::vote_processor vote_processor;
unsigned warmed_up;
nano::block_processor block_processor;
nano::block_arrival block_arrival;
nano::local_vote_history history;
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer;

View file

@ -20,10 +20,10 @@ void nano::process_live_dispatcher::connect (nano::block_processor & block_proce
{
block_processor.batch_processed.add ([this] (auto const & batch) {
auto const transaction = ledger.store.tx_begin_read ();
for (auto const & [result, block] : batch)
for (auto const & [result, context] : batch)
{
debug_assert (block != nullptr);
inspect (result, *block, transaction);
debug_assert (context.block != nullptr);
inspect (result, *context.block, transaction);
}
});
}

View file

@ -4431,7 +4431,6 @@ TEST (rpc, populate_backlog)
.work (*node->work_generate_blocking (latest))
.build ();
ASSERT_EQ (nano::process_result::progress, node->process (*send).code);
ASSERT_FALSE (node->block_arrival.recent (send->hash ()));
auto const rpc_ctx = add_rpc (system, node);
boost::property_tree::ptree request;

View file

@ -15,6 +15,7 @@
#include <crypto/ed25519-donna/ed25519.h>
#include <cryptopp/words.h>
#include <magic_enum.hpp>
size_t constexpr nano::send_block::size;
size_t constexpr nano::receive_block::size;
@ -496,6 +497,11 @@ void nano::generate_cache::enable_all ()
account_count = true;
}
std::string_view nano::to_string (nano::process_result process_result)
{
return magic_enum::enum_name (process_result);
}
nano::stat::detail nano::to_stat_detail (nano::process_result process_result)
{
auto value = magic_enum::enum_cast<nano::stat::detail> (magic_enum::enum_name (process_result));

View file

@ -270,6 +270,10 @@ enum class process_result
block_position, // This block cannot follow the previous block
insufficient_work // Insufficient work for this block, even though it passed the minimal validation
};
std::string_view to_string (process_result);
nano::stat::detail to_stat_detail (process_result);
class process_return final
{
public:
@ -282,8 +286,6 @@ enum class tally_result
confirm
};
nano::stat::detail to_stat_detail (process_result);
class network_params;
/** Genesis keys and ledger constants for network variants */