Merge pull request #4824 from pwojcikdev/ledger-notifications-3

Fix out of order ledger notifications
This commit is contained in:
Piotr Wójcik 2025-01-17 22:10:10 +01:00 committed by GitHub
commit 0db11e8305
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
35 changed files with 497 additions and 271 deletions

View file

@ -4,6 +4,7 @@
#include <nano/node/block_processor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/make_store.hpp>
#include <nano/node/unchecked_map.hpp>
#include <nano/secure/ledger.hpp>
@ -26,17 +27,15 @@ struct confirming_set_context
nano::stats & stats;
nano::ledger & ledger;
nano::unchecked_map unchecked;
nano::block_processor block_processor;
nano::ledger_notifications ledger_notifications;
nano::confirming_set confirming_set;
explicit confirming_set_context (nano::test::ledger_context & ledger_context, nano::node_config node_config = {}) :
logger{ ledger_context.logger () },
stats{ ledger_context.stats () },
ledger{ ledger_context.ledger () },
unchecked{ 0, stats, false },
block_processor{ node_config, ledger, unchecked, stats, logger },
confirming_set{ node_config.confirming_set, ledger, block_processor, stats, logger }
ledger_notifications{ node_config, stats, logger },
confirming_set{ node_config.confirming_set, ledger, ledger_notifications, stats, logger }
{
}
};
@ -78,21 +77,20 @@ TEST (confirming_set, process_one)
TEST (confirming_set, process_multiple)
{
nano::test::system system;
auto & node = *system.add_node ();
auto ctx = nano::test::ledger_send_receive ();
nano::confirming_set_config config{};
nano::confirming_set confirming_set{ config, ctx.ledger (), node.block_processor, ctx.stats (), ctx.logger () };
auto ledger_ctx = nano::test::ledger_send_receive ();
confirming_set_context ctx{ ledger_ctx };
nano::confirming_set & confirming_set = ctx.confirming_set;
std::atomic<int> count = 0;
std::mutex mutex;
std::condition_variable condition;
confirming_set.cemented_observers.add ([&] (auto const &) { ++count; condition.notify_all (); });
confirming_set.add (ctx.blocks ()[0]->hash ());
confirming_set.add (ctx.blocks ()[1]->hash ());
confirming_set.add (ledger_ctx.blocks ()[0]->hash ());
confirming_set.add (ledger_ctx.blocks ()[1]->hash ());
nano::test::start_stop_guard guard{ confirming_set };
std::unique_lock lock{ mutex };
ASSERT_TRUE (condition.wait_for (lock, 5s, [&] () { return count == 2; }));
ASSERT_EQ (2, ctx.stats ().count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (3, ctx.ledger ().cemented_count ());
ASSERT_EQ (2, ctx.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (3, ctx.ledger.cemented_count ());
}
TEST (confirmation_callback, observer_callbacks)

View file

@ -325,7 +325,6 @@ TEST (toml_config, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable);
ASSERT_EQ (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size);
ASSERT_EQ (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications);
ASSERT_EQ (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate);
ASSERT_EQ (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled);
@ -743,7 +742,6 @@ TEST (toml_config, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.bounded_backlog.enable, defaults.node.bounded_backlog.enable);
ASSERT_NE (conf.node.bounded_backlog.batch_size, defaults.node.bounded_backlog.batch_size);
ASSERT_NE (conf.node.bounded_backlog.max_queued_notifications, defaults.node.bounded_backlog.max_queued_notifications);
ASSERT_NE (conf.node.bounded_backlog.scan_rate, defaults.node.bounded_backlog.scan_rate);
ASSERT_NE (conf.node.websocket_config.enabled, defaults.node.websocket_config.enabled);

View file

@ -17,6 +17,7 @@ enum class type
message,
block,
ledger,
ledger_notifications,
rollback,
network,
vote,
@ -577,6 +578,10 @@ enum class detail
tier_2,
tier_3,
// ledger_notifications
notify_processed,
notify_rolled_back,
// confirming_set
notify_cemented,
notify_already_cemented,

View file

@ -40,8 +40,8 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
case nano::thread_role::name::block_processing_notifications:
thread_role_name_string = "Blck proc notif";
case nano::thread_role::name::ledger_notifications:
thread_role_name_string = "Ledger notif";
break;
case nano::thread_role::name::request_loop:
thread_role_name_string = "Request loop";
@ -106,9 +106,6 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::bounded_backlog_scan:
thread_role_name_string = "Bounded b scan";
break;
case nano::thread_role::name::bounded_backlog_notifications:
thread_role_name_string = "Bounded b notif";
break;
case nano::thread_role::name::vote_generator_queue:
thread_role_name_string = "Voting que";
break;

View file

@ -18,7 +18,7 @@ enum class name
vote_processing,
vote_cache_processing,
block_processing,
block_processing_notifications,
ledger_notifications,
request_loop,
wallet_actions,
bootstrap_initiator,
@ -40,7 +40,6 @@ enum class name
backlog_scan,
bounded_backlog,
bounded_backlog_scan,
bounded_backlog_notifications,
vote_generator_queue,
telemetry,
bootstrap,

View file

@ -20,8 +20,11 @@ add_library(
backlog_scan.cpp
bandwidth_limiter.hpp
bandwidth_limiter.cpp
block_context.hpp
block_processor.hpp
block_processor.cpp
block_source.hpp
block_source.cpp
bucketing.hpp
bucketing.cpp
bounded_backlog.hpp
@ -85,6 +88,8 @@ add_library(
ipc/ipc_server.cpp
json_handler.hpp
json_handler.cpp
ledger_notifications.hpp
ledger_notifications.cpp
local_block_broadcaster.cpp
local_block_broadcaster.hpp
local_vote_history.cpp

View file

@ -7,6 +7,7 @@
#include <nano/node/confirmation_solicitor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/node.hpp>
#include <nano/node/online_reps.hpp>
#include <nano/node/repcrawler.hpp>
@ -21,11 +22,11 @@
using namespace std::chrono;
nano::active_elections::active_elections (nano::node & node_a, nano::confirming_set & confirming_set_a, nano::block_processor & block_processor_a) :
nano::active_elections::active_elections (nano::node & node_a, nano::ledger_notifications & ledger_notifications_a, nano::confirming_set & confirming_set_a) :
config{ node_a.config.active_elections },
node{ node_a },
ledger_notifications{ ledger_notifications_a },
confirming_set{ confirming_set_a },
block_processor{ block_processor_a },
recently_confirmed{ config.confirmation_cache },
recently_cemented{ config.confirmation_history_size }
{
@ -55,7 +56,7 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
});
// Notify elections about alternative (forked) blocks
block_processor.batch_processed.add ([this] (auto const & batch) {
ledger_notifications.blocks_processed.add ([this] (auto const & batch) {
for (auto const & [result, context] : batch)
{
if (result == nano::block_status::fork)
@ -66,7 +67,7 @@ nano::active_elections::active_elections (nano::node & node_a, nano::confirming_
});
// Stop all rolled back active transactions except initial
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
for (auto const & block : blocks)
{
if (block->qualified_root () != rollback_root)

View file

@ -90,7 +90,7 @@ private: // Elections
ordered_roots roots;
public:
active_elections (nano::node &, nano::confirming_set &, nano::block_processor &);
active_elections (nano::node &, nano::ledger_notifications &, nano::confirming_set &);
~active_elections ();
void start ();
@ -144,8 +144,8 @@ private:
private: // Dependencies
active_elections_config const & config;
nano::node & node;
nano::ledger_notifications & ledger_notifications;
nano::confirming_set & confirming_set;
nano::block_processor & block_processor;
public:
nano::recently_confirmed_cache recently_confirmed;

View file

@ -0,0 +1,44 @@
#pragma once
#include <nano/node/block_source.hpp>
#include <nano/secure/common.hpp>
#include <future>
namespace nano
{
class block_context
{
public:
using result_t = nano::block_status;
using callback_t = std::function<void (result_t)>;
public: // Keep fields public for simplicity
std::shared_ptr<nano::block> block;
nano::block_source source;
callback_t callback;
std::chrono::steady_clock::time_point arrival{ std::chrono::steady_clock::now () };
public:
block_context (std::shared_ptr<nano::block> block, nano::block_source source, callback_t callback = nullptr) :
block{ std::move (block) },
source{ source },
callback{ std::move (callback) }
{
debug_assert (source != nano::block_source::unknown);
}
std::future<result_t> get_future ()
{
return promise.get_future ();
}
void set_result (result_t result)
{
promise.set_value (result);
}
private:
std::promise<result_t> promise;
};
}

View file

@ -5,6 +5,7 @@
#include <nano/lib/timer.hpp>
#include <nano/node/active_elections.hpp>
#include <nano/node/block_processor.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/local_vote_history.hpp>
#include <nano/node/node.hpp>
#include <nano/node/unchecked_map.hpp>
@ -18,14 +19,14 @@
* block_processor
*/
nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) :
nano::block_processor::block_processor (nano::node_config const & node_config, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, nano::unchecked_map & unchecked_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ node_config.block_processor },
network_params{ node_config.network_params },
ledger{ ledger_a },
ledger_notifications{ ledger_notifications_a },
unchecked{ unchecked_a },
stats{ stats_a },
logger{ logger_a },
workers{ 1, nano::thread_role::name::block_processing_notifications }
logger{ logger_a }
{
queue.max_size_query = [this] (auto const & origin) {
switch (origin.source)
@ -65,15 +66,12 @@ nano::block_processor::~block_processor ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!workers.alive ());
}
void nano::block_processor::start ()
{
debug_assert (!thread.joinable ());
workers.start ();
thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::block_processing);
run ();
@ -91,7 +89,6 @@ void nano::block_processor::stop ()
{
thread.join ();
}
workers.stop ();
}
// TODO: Remove and replace all checks with calls to size (block_source)
@ -121,7 +118,7 @@ bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, blo
to_string (source),
channel ? channel->to_string () : "<unknown>"); // TODO: Lazy eval
return add_impl (context{ block, source, std::move (callback) }, channel);
return add_impl ({ block, source, std::move (callback) }, channel);
}
std::optional<nano::block_status> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
@ -129,7 +126,7 @@ std::optional<nano::block_status> nano::block_processor::add_blocking (std::shar
stats.inc (nano::stat::type::block_processor, nano::stat::detail::process_blocking);
logger.debug (nano::log::type::block_processor, "Processing block (blocking): {} (source: {})", block->hash ().to_string (), to_string (source));
context ctx{ block, source };
nano::block_context ctx{ block, source };
auto future = ctx.get_future ();
add_impl (std::move (ctx));
@ -152,10 +149,10 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
stats.inc (nano::stat::type::block_processor, nano::stat::detail::force);
logger.debug (nano::log::type::block_processor, "Forcing block: {}", block_a->hash ().to_string ());
add_impl (context{ block_a, block_source::forced });
add_impl ({ block_a, block_source::forced });
}
bool nano::block_processor::add_impl (context ctx, std::shared_ptr<nano::transport::channel> const & channel)
bool nano::block_processor::add_impl (nano::block_context ctx, std::shared_ptr<nano::transport::channel> const & channel)
{
auto const source = ctx.source;
bool added = false;
@ -175,7 +172,7 @@ bool nano::block_processor::add_impl (context ctx, std::shared_ptr<nano::transpo
return added;
}
void nano::block_processor::rollback_competitor (secure::write_transaction const & transaction, nano::block const & fork_block)
void nano::block_processor::rollback_competitor (secure::write_transaction & transaction, nano::block const & fork_block)
{
auto const hash = fork_block.hash ();
auto const successor_hash = ledger.any.block_successor (transaction, fork_block.qualified_root ());
@ -197,10 +194,13 @@ void nano::block_processor::rollback_competitor (secure::write_transaction const
logger.debug (nano::log::type::block_processor, "Blocks rolled back: {}", rollback_list.size ());
}
// Notify observers of the rolled back blocks on a background thread while not holding the ledger write lock
workers.post ([this, rollback_list = std::move (rollback_list), root = fork_block.qualified_root ()] () {
rolled_back.notify (rollback_list, root);
});
if (!rollback_list.empty ())
{
// Notify observers of the rolled back blocks on a background thread while not holding the ledger write lock
ledger_notifications.notify_rolled_back (transaction, std::move (rollback_list), fork_block.qualified_root (), [this] {
stats.inc (nano::stat::type::block_processor, nano::stat::detail::notify_rolled_back);
});
}
}
}
@ -210,19 +210,26 @@ void nano::block_processor::run ()
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait (lock, [this] {
return stopped || !queue.empty ();
});
if (stopped)
{
return;
}
lock.unlock ();
// It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here
ledger_notifications.wait ([this] {
stats.inc (nano::stat::type::block_processor, nano::stat::detail::cooldown);
});
lock.lock ();
if (!queue.empty ())
{
// It's possible that ledger processing happens faster than the notifications can be processed by other components, cooldown here
while (workers.queued_tasks () >= config.max_queued_notifications)
{
stats.inc (nano::stat::type::block_processor, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped; });
if (stopped)
{
return;
}
}
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::block_processor, "{} blocks (+ {} forced) in processing queue",
@ -230,35 +237,14 @@ void nano::block_processor::run ()
queue.size ({ nano::block_source::forced }));
}
auto processed = process_batch (lock);
process_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();
// Queue notifications to be dispatched in the background
workers.post ([this, processed = std::move (processed)] () mutable {
stats.inc (nano::stat::type::block_processor, nano::stat::detail::notify);
// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
if (context.callback)
{
context.callback (result);
}
context.set_result (result);
}
batch_processed.notify (processed);
});
}
else
{
condition.wait (lock, [this] {
return stopped || !queue.empty ();
});
}
}
}
auto nano::block_processor::next () -> context
auto nano::block_processor::next () -> nano::block_context
{
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ()); // This should be checked before calling next
@ -273,14 +259,14 @@ auto nano::block_processor::next () -> context
release_assert (false, "next() called when no blocks are ready");
}
auto nano::block_processor::next_batch (size_t max_count) -> std::deque<context>
auto nano::block_processor::next_batch (size_t max_count) -> std::deque<nano::block_context>
{
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());
queue.periodic_update ();
std::deque<context> results;
std::deque<nano::block_context> results;
while (!queue.empty () && results.size () < max_count)
{
results.push_back (next ());
@ -288,7 +274,7 @@ auto nano::block_processor::next_batch (size_t max_count) -> std::deque<context>
return results;
}
auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock) -> processed_batch_t
void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
@ -307,7 +293,8 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
size_t number_of_blocks_processed = 0;
size_t number_of_forced_processed = 0;
processed_batch_t processed;
std::deque<std::pair<nano::block_status, nano::block_context>> processed;
for (auto & ctx : batch)
{
auto const hash = ctx.block->hash ();
@ -332,10 +319,13 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
logger.debug (nano::log::type::block_processor, "Processed {} blocks ({} forced) in {} {}", number_of_blocks_processed, number_of_forced_processed, timer.value ().count (), timer.unit ());
}
return processed;
// Queue notifications to be dispatched in the background
ledger_notifications.notify_processed (transaction, std::move (processed), [this] {
stats.inc (nano::stat::type::block_processor, nano::stat::detail::notify_processed);
});
}
nano::block_status nano::block_processor::process_one (secure::write_transaction const & transaction_a, context const & context, bool const forced_a)
nano::block_status nano::block_processor::process_one (secure::write_transaction const & transaction_a, nano::block_context const & context, bool const forced_a)
{
auto block = context.block;
auto const hash = block->hash ();
@ -451,32 +441,9 @@ nano::container_info nano::block_processor::container_info () const
info.put ("blocks", queue.size ());
info.put ("forced", queue.size ({ nano::block_source::forced }));
info.add ("queue", queue.container_info ());
info.add ("workers", workers.container_info ());
return info;
}
/*
* block_processor::context
*/
nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a, callback_t callback_a) :
block{ std::move (block) },
source{ source_a },
callback{ std::move (callback_a) }
{
debug_assert (source != nano::block_source::unknown);
}
auto nano::block_processor::context::get_future () -> std::future<result_t>
{
return promise.get_future ();
}
void nano::block_processor::context::set_result (result_t const & result)
{
promise.set_value (result);
}
/*
* block_processor_config
*/
@ -506,17 +473,3 @@ nano::error nano::block_processor_config::deserialize (nano::tomlconfig & toml)
return toml.get_error ();
}
/*
*
*/
std::string_view nano::to_string (nano::block_source source)
{
return nano::enum_util::name (source);
}
nano::stat::detail nano::to_stat_detail (nano::block_source type)
{
return nano::enum_util::cast<nano::stat::detail> (type);
}

View file

@ -2,6 +2,8 @@
#include <nano/lib/logging.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/node/block_context.hpp>
#include <nano/node/block_source.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/node/fwd.hpp>
#include <nano/secure/common.hpp>
@ -14,22 +16,6 @@
namespace nano
{
enum class block_source
{
unknown = 0,
live,
live_originator,
bootstrap,
bootstrap_legacy,
unchecked,
local,
forced,
election,
};
std::string_view to_string (block_source);
nano::stat::detail to_stat_detail (block_source);
class block_processor_config final
{
public:
@ -39,6 +25,8 @@ public:
nano::error serialize (nano::tomlconfig & toml) const;
public:
size_t batch_size{ 256 };
// Maximum number of blocks to queue from network peers
size_t max_peer_queue{ 128 };
// Maximum number of blocks to queue from system components (local RPC, bootstrap)
@ -49,9 +37,6 @@ public:
size_t priority_bootstrap{ 8 };
size_t priority_local{ 16 };
size_t priority_system{ 32 };
size_t batch_size{ 256 };
size_t max_queued_notifications{ 8 };
};
/**
@ -60,31 +45,8 @@ public:
*/
class block_processor final
{
public: // Context
class context
{
public:
using result_t = nano::block_status;
using callback_t = std::function<void (result_t)>;
context (std::shared_ptr<nano::block> block, nano::block_source source, callback_t callback = nullptr);
std::shared_ptr<nano::block> block;
nano::block_source source;
callback_t callback;
std::chrono::steady_clock::time_point arrival{ std::chrono::steady_clock::now () };
std::future<result_t> get_future ();
private:
void set_result (result_t const &);
std::promise<result_t> promise;
friend class block_processor;
};
public:
block_processor (nano::node_config const &, nano::ledger &, nano::unchecked_map &, nano::stats &, nano::logger &);
block_processor (nano::node_config const &, nano::ledger &, nano::ledger_notifications &, nano::unchecked_map &, nano::stats &, nano::logger &);
~block_processor ();
void start ();
@ -100,20 +62,11 @@ public:
std::atomic<bool> flushing{ false };
public: // Events
// All processed blocks including forks, rejected etc
using processed_batch_t = std::deque<std::pair<nano::block_status, context>>;
using processed_batch_event_t = nano::observer_set<processed_batch_t>;
processed_batch_event_t batch_processed;
// Rolled back blocks <rolled back blocks, root of rollback>
using rolled_back_event_t = nano::observer_set<std::deque<std::shared_ptr<nano::block>>, nano::qualified_root>;
rolled_back_event_t rolled_back;
private: // Dependencies
block_processor_config const & config;
nano::network_params const & network_params;
nano::ledger & ledger;
nano::ledger_notifications & ledger_notifications;
nano::unchecked_map & unchecked;
nano::stats & stats;
nano::logger & logger;
@ -121,21 +74,19 @@ private: // Dependencies
private:
void run ();
// Roll back block in the ledger that conflicts with 'block'
void rollback_competitor (secure::write_transaction const &, nano::block const & block);
nano::block_status process_one (secure::write_transaction const &, context const &, bool forced = false);
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
std::deque<context> next_batch (size_t max_count);
context next ();
bool add_impl (context, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
void rollback_competitor (secure::write_transaction &, nano::block const & block);
nano::block_status process_one (secure::write_transaction const &, nano::block_context const &, bool forced = false);
void process_batch (nano::unique_lock<nano::mutex> &);
std::deque<nano::block_context> next_batch (size_t max_count);
nano::block_context next ();
bool add_impl (nano::block_context, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
private:
nano::fair_queue<context, nano::block_source> queue;
nano::fair_queue<nano::block_context, nano::block_source> queue;
bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
std::thread thread;
nano::thread_pool workers;
};
}

View file

@ -0,0 +1,13 @@
#include <nano/lib/enum_util.hpp>
#include <nano/lib/stats_enums.hpp>
#include <nano/node/block_source.hpp>
std::string_view nano::to_string (nano::block_source source)
{
return nano::enum_util::name (source);
}
nano::stat::detail nano::to_stat_detail (nano::block_source type)
{
return nano::enum_util::cast<nano::stat::detail> (type);
}

View file

@ -0,0 +1,24 @@
#pragma once
#include <nano/lib/fwd.hpp>
#include <string_view>
namespace nano
{
enum class block_source
{
unknown = 0,
live,
live_originator,
bootstrap,
bootstrap_legacy,
unchecked,
local,
forced,
election,
};
std::string_view to_string (block_source);
nano::stat::detail to_stat_detail (block_source);
}

View file

@ -6,6 +6,7 @@
#include <nano/node/block_processor.hpp>
#include <nano/node/bootstrap/bootstrap_service.hpp>
#include <nano/node/bootstrap/crawlers.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/network.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/node/transport/transport.hpp>
@ -18,11 +19,13 @@
using namespace std::chrono_literals;
nano::bootstrap_service::bootstrap_service (nano::node_config const & node_config_a, nano::block_processor & block_processor_a, nano::ledger & ledger_a, nano::network & network_a, nano::stats & stat_a, nano::logger & logger_a) :
nano::bootstrap_service::bootstrap_service (nano::node_config const & node_config_a, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a,
nano::block_processor & block_processor_a, nano::network & network_a, nano::stats & stat_a, nano::logger & logger_a) :
config{ node_config_a.bootstrap },
network_constants{ node_config_a.network_params.network },
block_processor{ block_processor_a },
ledger{ ledger_a },
ledger_notifications{ ledger_notifications_a },
block_processor{ block_processor_a },
network{ network_a },
stats{ stat_a },
logger{ logger_a },
@ -36,7 +39,8 @@ nano::bootstrap_service::bootstrap_service (nano::node_config const & node_confi
frontiers_limiter{ config.frontier_rate_limit },
workers{ 1, nano::thread_role::name::bootstrap_worker }
{
block_processor.batch_processed.add ([this] (auto const & batch) {
// Inspect all processed blocks
ledger_notifications.blocks_processed.add ([this] (auto const & batch) {
{
nano::lock_guard<nano::mutex> lock{ mutex };
@ -51,7 +55,7 @@ nano::bootstrap_service::bootstrap_service (nano::node_config const & node_confi
});
// Unblock rolled back accounts as the dependency is no longer valid
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
nano::lock_guard<nano::mutex> lock{ mutex };
for (auto const & block : blocks)
{

View file

@ -30,7 +30,7 @@ namespace nano
class bootstrap_service
{
public:
bootstrap_service (nano::node_config const &, nano::block_processor &, nano::ledger &, nano::network &, nano::stats &, nano::logger &);
bootstrap_service (nano::node_config const &, nano::ledger &, nano::ledger_notifications &, nano::block_processor &, nano::network &, nano::stats &, nano::logger &);
~bootstrap_service ();
void start ();
@ -55,8 +55,9 @@ public:
private: // Dependencies
bootstrap_config const & config;
nano::network_constants const & network_constants;
nano::block_processor & block_processor;
nano::ledger & ledger;
nano::ledger_notifications & ledger_notifications;
nano::block_processor & block_processor;
nano::network & network;
nano::stats & stats;
nano::logger & logger;

View file

@ -4,6 +4,7 @@
#include <nano/node/block_processor.hpp>
#include <nano/node/bounded_backlog.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/node.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/secure/common.hpp>
@ -12,18 +13,17 @@
#include <nano/secure/ledger_set_confirmed.hpp>
#include <nano/secure/transaction.hpp>
nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) :
nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, nano::bucketing & bucketing_a, nano::backlog_scan & backlog_scan_a, nano::block_processor & block_processor_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
node{ node_a },
ledger{ ledger_a },
ledger_notifications{ ledger_notifications_a },
bucketing{ bucketing_a },
backlog_scan{ backlog_scan_a },
block_processor{ block_processor_a },
confirming_set{ confirming_set_a },
stats{ stats_a },
logger{ logger_a },
scan_limiter{ config.bounded_backlog.scan_rate },
workers{ 1, nano::thread_role::name::bounded_backlog_notifications }
scan_limiter{ config.bounded_backlog.scan_rate }
{
// Activate accounts with unconfirmed blocks
backlog_scan.batch_activated.add ([this] (auto const & batch) {
@ -47,7 +47,7 @@ nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano
});
// Track unconfirmed blocks
block_processor.batch_processed.add ([this] (auto const & batch) {
ledger_notifications.blocks_processed.add ([this] (auto const & batch) {
auto transaction = ledger.tx_begin_read ();
for (auto const & [result, context] : batch)
{
@ -60,7 +60,7 @@ nano::bounded_backlog::bounded_backlog (nano::node_config const & config_a, nano
});
// Remove rolled back blocks from the backlog
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
nano::lock_guard<nano::mutex> guard{ mutex };
for (auto const & block : blocks)
{
@ -83,7 +83,6 @@ nano::bounded_backlog::~bounded_backlog ()
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (!scan_thread.joinable ());
debug_assert (!workers.alive ());
}
void nano::bounded_backlog::start ()
@ -95,8 +94,6 @@ void nano::bounded_backlog::start ()
return;
}
workers.start ();
thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::bounded_backlog);
run ();
@ -123,7 +120,6 @@ void nano::bounded_backlog::stop ()
{
scan_thread.join ();
}
workers.stop ();
}
size_t nano::bounded_backlog::index_size () const
@ -206,16 +202,14 @@ void nano::bounded_backlog::run ()
return;
}
lock.unlock ();
// Wait until all notification about the previous rollbacks are processed
while (workers.queued_tasks () >= config.bounded_backlog.max_queued_notifications)
{
ledger_notifications.wait ([this] {
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped.load (); });
if (stopped)
{
return;
}
}
});
lock.lock ();
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::loop);
@ -310,9 +304,8 @@ std::deque<nano::block_hash> nano::bounded_backlog::perform_rollbacks (std::dequ
}
// Notify observers of the rolled back blocks on a background thread, avoid dispatching notifications when holding ledger write transaction
workers.post ([this, rollback_list = std::move (rollback_list), root = block->qualified_root ()] {
// TODO: Calling block_processor's event here is not ideal, but duplicating these events is even worse
block_processor.rolled_back.notify (rollback_list, root);
ledger_notifications.notify_rolled_back (transaction, std::move (rollback_list), block->qualified_root (), [this] {
stats.inc (nano::stat::type::bounded_backlog, nano::stat::detail::notify_rolled_back);
});
// Return early if we reached the maximum number of rollbacks
@ -420,7 +413,6 @@ nano::container_info nano::bounded_backlog::container_info () const
nano::lock_guard<nano::mutex> guard{ mutex };
nano::container_info info;
info.put ("backlog", index.size ());
info.put ("notifications", workers.queued_tasks ());
info.add ("index", index.container_info ());
return info;
}
@ -547,7 +539,6 @@ nano::error nano::bounded_backlog_config::serialize (nano::tomlconfig & toml) co
{
toml.put ("enable", enable, "Enable the bounded backlog. \ntype:bool");
toml.put ("batch_size", batch_size, "Maximum number of blocks to rollback per iteration. \ntype:uint64");
toml.put ("max_queued_notifications", max_queued_notifications, "Maximum number of queued background tasks before cooldown. \ntype:uint64");
toml.put ("scan_rate", scan_rate, "Rate limit for refreshing the backlog index. \ntype:uint64");
return toml.get_error ();
@ -557,7 +548,6 @@ nano::error nano::bounded_backlog_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("enable", enable);
toml.get ("batch_size", batch_size);
toml.get ("max_queued_notifications", max_queued_notifications);
toml.get ("scan_rate", scan_rate);
return toml.get_error ();

View file

@ -101,14 +101,13 @@ public:
public:
bool enable{ true };
size_t batch_size{ 32 };
size_t max_queued_notifications{ 128 };
size_t scan_rate{ 64 };
};
class bounded_backlog
{
public:
bounded_backlog (nano::node_config const &, nano::node &, nano::ledger &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &);
bounded_backlog (nano::node_config const &, nano::node &, nano::ledger &, nano::ledger_notifications &, nano::bucketing &, nano::backlog_scan &, nano::block_processor &, nano::confirming_set &, nano::stats &, nano::logger &);
~bounded_backlog ();
void start ();
@ -124,9 +123,9 @@ private: // Dependencies
nano::node_config const & config;
nano::node & node;
nano::ledger & ledger;
nano::ledger_notifications & ledger_notifications;
nano::bucketing & bucketing;
nano::backlog_scan & backlog_scan;
nano::block_processor & block_processor;
nano::confirming_set & confirming_set;
nano::stats & stats;
nano::logger & logger;
@ -155,7 +154,5 @@ private:
mutable nano::mutex mutex;
std::thread thread;
std::thread scan_thread;
nano::thread_pool workers;
};
}

View file

@ -4,16 +4,17 @@
#include <nano/lib/thread_roles.hpp>
#include <nano/node/block_processor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
#include <nano/secure/ledger_set_confirmed.hpp>
#include <nano/store/component.hpp>
#include <nano/store/write_queue.hpp>
nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::block_processor & block_processor_a, nano::stats & stats_a, nano::logger & logger_a) :
nano::confirming_set::confirming_set (confirming_set_config const & config_a, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
ledger{ ledger_a },
block_processor{ block_processor_a },
ledger_notifications{ ledger_notifications_a },
stats{ stats_a },
logger{ logger_a },
workers{ 1, nano::thread_role::name::confirmation_height_notifications }
@ -26,7 +27,7 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na
});
// Requeue blocks that failed to cement immediately due to missing ledger blocks
block_processor.batch_processed.add ([this] (auto const & batch) {
ledger_notifications.blocks_processed.add ([this] (auto const & batch) {
bool should_notify = false;
{
std::lock_guard lock{ mutex };

View file

@ -52,7 +52,7 @@ class confirming_set final
friend class confirmation_height_pruned_source_Test;
public:
confirming_set (confirming_set_config const &, nano::ledger &, nano::block_processor &, nano::stats &, nano::logger &);
confirming_set (confirming_set_config const &, nano::ledger &, nano::ledger_notifications &, nano::stats &, nano::logger &);
~confirming_set ();
void start ();
@ -83,7 +83,7 @@ public: // Events
private: // Dependencies
confirming_set_config const & config;
nano::ledger & ledger;
nano::block_processor & block_processor;
nano::ledger_notifications & ledger_notifications;
nano::stats & stats;
nano::logger & logger;

View file

@ -19,6 +19,7 @@ class bootstrap_service;
class confirming_set;
class election;
class election_status;
class ledger_notifications;
class local_block_broadcaster;
class local_vote_history;
class logger;

View file

@ -0,0 +1,138 @@
#include <nano/lib/thread_roles.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/secure/transaction.hpp>
nano::ledger_notifications::ledger_notifications (nano::node_config const & config, nano::stats & stats, nano::logger & logger) :
config{ config },
stats{ stats },
logger{ logger }
{
}
nano::ledger_notifications::~ledger_notifications ()
{
debug_assert (!thread.joinable ());
}
void nano::ledger_notifications::start ()
{
debug_assert (!thread.joinable ());
thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::ledger_notifications);
run ();
} };
}
void nano::ledger_notifications::stop ()
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}
void nano::ledger_notifications::wait (std::function<void ()> cooldown_action)
{
nano::unique_lock<nano::mutex> lock{ mutex };
condition.wait (lock, [this, &cooldown_action] {
bool predicate = stopped || notifications.size () < config.max_ledger_notifications;
if (!predicate && cooldown_action)
{
cooldown_action ();
}
return predicate;
});
}
void nano::ledger_notifications::notify_processed (nano::secure::write_transaction & transaction, processed_batch_t processed, std::function<void ()> callback)
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
notifications.emplace_back (transaction.get_future (), nano::wrap_move_only ([this, processed = std::move (processed), callback = std::move (callback)] () mutable {
stats.inc (nano::stat::type::ledger_notifications, nano::stat::detail::notify_processed);
// Set results for futures when not holding the lock
for (auto & [result, context] : processed)
{
if (context.callback)
{
context.callback (result);
}
context.set_result (result);
}
blocks_processed.notify (processed);
if (callback)
{
callback ();
}
}));
}
condition.notify_all ();
}
void nano::ledger_notifications::notify_rolled_back (nano::secure::write_transaction & transaction, rolled_back_batch_t batch, nano::qualified_root rollback_root, std::function<void ()> callback)
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
notifications.emplace_back (transaction.get_future (), nano::wrap_move_only ([this, batch = std::move (batch), rollback_root, callback = std::move (callback)] () {
stats.inc (nano::stat::type::ledger_notifications, nano::stat::detail::notify_rolled_back);
blocks_rolled_back.notify (batch, rollback_root);
if (callback)
{
callback ();
}
}));
}
condition.notify_all ();
}
void nano::ledger_notifications::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait (lock, [this] {
return stopped || !notifications.empty ();
});
if (stopped)
{
return;
}
while (!notifications.empty ())
{
auto notification = std::move (notifications.front ());
notifications.pop_front ();
lock.unlock ();
auto & [future, callback] = notification;
future.wait (); // Wait for the associated transaction to be committed
callback (); // Notify observers
condition.notify_all (); // Notify waiting threads about possible vacancy
lock.lock ();
}
}
}
nano::container_info nano::ledger_notifications::container_info () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
nano::container_info info;
info.put ("notifications", notifications.size ());
return info;
}

View file

@ -0,0 +1,66 @@
#pragma once
#include <nano/lib/function.hpp>
#include <nano/lib/observer_set.hpp>
#include <nano/node/block_context.hpp>
#include <nano/node/fwd.hpp>
#include <nano/secure/common.hpp>
#include <deque>
#include <functional>
#include <future>
#include <thread>
namespace nano
{
class ledger_notifications
{
public: // Events
// All processed blocks including forks, rejected etc
using processed_batch_t = std::deque<std::pair<nano::block_status, nano::block_context>>;
using processed_batch_event_t = nano::observer_set<processed_batch_t>;
processed_batch_event_t blocks_processed;
// Rolled back blocks <rolled back blocks, root of rollback>
using rolled_back_batch_t = std::deque<std::shared_ptr<nano::block>>;
using rolled_back_event_t = nano::observer_set<std::deque<std::shared_ptr<nano::block>>, nano::qualified_root>;
rolled_back_event_t blocks_rolled_back;
public:
ledger_notifications (nano::node_config const &, nano::stats &, nano::logger &);
~ledger_notifications ();
void start ();
void stop ();
/* Components should cooperate to ensure that the notification queue does not grow indefinitely */
void wait (std::function<void ()> cooldown_action = nullptr);
/*
* Write transactions are passed to ensure that notifications are queued in the correct order, which is the same as the order of write transactions
* However, we cannot dispatch notifications before the write transaction is committed otherwise the notified components may not see the changes
* It's an important subtlety and the reason for additional complexity in this and transaction classes
*/
void notify_processed (nano::secure::write_transaction &, processed_batch_t batch, std::function<void ()> callback = nullptr);
void notify_rolled_back (nano::secure::write_transaction &, rolled_back_batch_t batch, nano::qualified_root rollback_root, std::function<void ()> callback = nullptr);
nano::container_info container_info () const;
private: // Dependencies
nano::node_config const & config;
nano::stats & stats;
nano::logger & logger;
private:
void run ();
private:
using entry = std::pair<std::shared_future<void>, std::function<void ()>>; // <transaction commited future, notification callback>
std::deque<entry> notifications;
std::thread thread;
nano::condition_variable condition;
mutable nano::mutex mutex;
bool stopped{ false };
};
}

View file

@ -3,6 +3,7 @@
#include <nano/lib/utility.hpp>
#include <nano/node/block_processor.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/local_block_broadcaster.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
@ -10,10 +11,10 @@
#include <boost/range/iterator_range.hpp>
nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_config const & config_a, nano::node & node_a, nano::block_processor & block_processor_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) :
nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_config const & config_a, nano::node & node_a, nano::ledger_notifications & ledger_notifications_a, nano::network & network_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a, bool enabled_a) :
config{ config_a },
node{ node_a },
block_processor{ block_processor_a },
ledger_notifications{ ledger_notifications_a },
network{ network_a },
confirming_set{ confirming_set_a },
stats{ stats_a },
@ -26,7 +27,7 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_
return;
}
block_processor.batch_processed.add ([this] (auto const & batch) {
ledger_notifications.blocks_processed.add ([this] (auto const & batch) {
bool should_notify = false;
for (auto const & [result, context] : batch)
{
@ -56,7 +57,7 @@ nano::local_block_broadcaster::local_block_broadcaster (local_block_broadcaster_
}
});
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
nano::lock_guard<nano::mutex> guard{ mutex };
for (auto const & block : blocks)
{

View file

@ -53,7 +53,7 @@ public:
class local_block_broadcaster final
{
public:
local_block_broadcaster (local_block_broadcaster_config const &, nano::node &, nano::block_processor &, nano::network &, nano::confirming_set &, nano::stats &, nano::logger &, bool enabled = false);
local_block_broadcaster (local_block_broadcaster_config const &, nano::node &, nano::ledger_notifications &, nano::network &, nano::confirming_set &, nano::stats &, nano::logger &, bool enabled = false);
~local_block_broadcaster ();
void start ();
@ -73,7 +73,7 @@ private:
private: // Dependencies
local_block_broadcaster_config const & config;
nano::node & node;
nano::block_processor & block_processor;
nano::ledger_notifications & ledger_notifications;
nano::network & network;
nano::confirming_set & confirming_set;
nano::stats & stats;

View file

@ -20,6 +20,7 @@
#include <nano/node/daemonconfig.hpp>
#include <nano/node/election_status.hpp>
#include <nano/node/endpoint.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/local_block_broadcaster.hpp>
#include <nano/node/local_vote_history.hpp>
#include <nano/node/make_store.hpp>
@ -116,6 +117,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
wallets{ *wallets_impl },
ledger_impl{ std::make_unique<nano::ledger> (store, stats, network_params.ledger, flags_a.generate_cache, config_a.representative_vote_weight_minimum.number ()) },
ledger{ *ledger_impl },
ledger_notifications_impl{ std::make_unique<nano::ledger_notifications> (config, stats, logger) },
ledger_notifications{ *ledger_notifications_impl },
outbound_limiter_impl{ std::make_unique<nano::bandwidth_limiter> (config) },
outbound_limiter{ *outbound_limiter_impl },
message_processor_impl{ std::make_unique<nano::message_processor> (config.message_processor, *this) },
@ -138,13 +141,13 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
tcp_listener{ *tcp_listener_impl },
port_mapping_impl{ std::make_unique<nano::port_mapping> (*this) },
port_mapping{ *port_mapping_impl },
block_processor_impl{ std::make_unique<nano::block_processor> (config, ledger, unchecked, stats, logger) },
block_processor_impl{ std::make_unique<nano::block_processor> (config, ledger, ledger_notifications, unchecked, stats, logger) },
block_processor{ *block_processor_impl },
confirming_set_impl{ std::make_unique<nano::confirming_set> (config.confirming_set, ledger, block_processor, stats, logger) },
confirming_set_impl{ std::make_unique<nano::confirming_set> (config.confirming_set, ledger, ledger_notifications, stats, logger) },
confirming_set{ *confirming_set_impl },
bucketing_impl{ std::make_unique<nano::bucketing> () },
bucketing{ *bucketing_impl },
active_impl{ std::make_unique<nano::active_elections> (*this, confirming_set, block_processor) },
active_impl{ std::make_unique<nano::active_elections> (*this, ledger_notifications, confirming_set) },
active{ *active_impl },
online_reps_impl{ std::make_unique<nano::online_reps> (config, ledger, stats, logger) },
online_reps{ *online_reps_impl },
@ -170,23 +173,23 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
generator{ *generator_impl },
final_generator_impl{ std::make_unique<nano::vote_generator> (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true) },
final_generator{ *final_generator_impl },
scheduler_impl{ std::make_unique<nano::scheduler::component> (config, *this, ledger, bucketing, block_processor, active, online_reps, vote_cache, confirming_set, stats, logger) },
scheduler_impl{ std::make_unique<nano::scheduler::component> (config, *this, ledger, ledger_notifications, bucketing, active, online_reps, vote_cache, confirming_set, stats, logger) },
scheduler{ *scheduler_impl },
aggregator_impl{ std::make_unique<nano::request_aggregator> (config.request_aggregator, *this, generator, final_generator, history, ledger, wallets, vote_router) },
aggregator{ *aggregator_impl },
backlog_scan_impl{ std::make_unique<nano::backlog_scan> (config.backlog_scan, ledger, stats) },
backlog_scan{ *backlog_scan_impl },
backlog_impl{ std::make_unique<nano::bounded_backlog> (config, *this, ledger, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) },
backlog_impl{ std::make_unique<nano::bounded_backlog> (config, *this, ledger, ledger_notifications, bucketing, backlog_scan, block_processor, confirming_set, stats, logger) },
backlog{ *backlog_impl },
bootstrap_server_impl{ std::make_unique<nano::bootstrap_server> (config.bootstrap_server, store, ledger, network_params.network, stats) },
bootstrap_server{ *bootstrap_server_impl },
bootstrap_impl{ std::make_unique<nano::bootstrap_service> (config, block_processor, ledger, network, stats, logger) },
bootstrap_impl{ std::make_unique<nano::bootstrap_service> (config, ledger, ledger_notifications, block_processor, network, stats, logger) },
bootstrap{ *bootstrap_impl },
websocket_impl{ std::make_unique<nano::websocket_server> (config.websocket_config, observers, wallets, ledger, io_ctx, logger) },
websocket{ *websocket_impl },
epoch_upgrader_impl{ std::make_unique<nano::epoch_upgrader> (*this, ledger, store, network_params, logger) },
epoch_upgrader{ *epoch_upgrader_impl },
local_block_broadcaster_impl{ std::make_unique<nano::local_block_broadcaster> (config.local_block_broadcaster, *this, block_processor, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing) },
local_block_broadcaster_impl{ std::make_unique<nano::local_block_broadcaster> (config.local_block_broadcaster, *this, ledger_notifications, network, confirming_set, stats, logger, !flags.disable_block_processor_republishing) },
local_block_broadcaster{ *local_block_broadcaster_impl },
process_live_dispatcher_impl{ std::make_unique<nano::process_live_dispatcher> (ledger, scheduler.priority, vote_cache, websocket) },
process_live_dispatcher{ *process_live_dispatcher_impl },
@ -237,8 +240,23 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
active.recently_confirmed.erase (hash);
});
// Announce new blocks via websocket
ledger_notifications.blocks_processed.add ([this] (auto const & batch) {
auto const transaction = ledger.tx_begin_read ();
for (auto const & [result, context] : batch)
{
if (result == nano::block_status::progress)
{
if (websocket.server && websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
{
websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (*context.block));
}
}
}
});
// Do some cleanup of rolled back blocks
block_processor.rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
ledger_notifications.blocks_rolled_back.add ([this] (auto const & blocks, auto const & rollback_root) {
for (auto const & block : blocks)
{
history.erase (block->root ());
@ -539,6 +557,7 @@ void nano::node::start ()
rep_tiers.start ();
vote_processor.start ();
vote_cache_processor.start ();
ledger_notifications.start ();
block_processor.start ();
active.start ();
generator.start ();
@ -595,6 +614,7 @@ void nano::node::stop ()
generator.stop ();
final_generator.stop ();
confirming_set.stop ();
ledger_notifications.stop ();
telemetry.stop ();
websocket.stop ();
bootstrap_server.stop ();

View file

@ -132,6 +132,8 @@ public:
nano::wallets & wallets;
std::unique_ptr<nano::ledger> ledger_impl;
nano::ledger & ledger;
std::unique_ptr<nano::ledger_notifications> ledger_notifications_impl;
nano::ledger_notifications & ledger_notifications;
std::unique_ptr<nano::bandwidth_limiter> outbound_limiter_impl;
nano::bandwidth_limiter & outbound_limiter;
std::unique_ptr<nano::message_processor> message_processor_impl;

View file

@ -134,6 +134,7 @@ public:
nano::rocksdb_config rocksdb_config;
nano::lmdb_config lmdb_config;
bool enable_upnp{ true };
std::size_t max_ledger_notifications{ 8 };
public:
nano::vote_cache_config vote_cache;

View file

@ -19,32 +19,12 @@ nano::process_live_dispatcher::process_live_dispatcher (nano::ledger & ledger, n
void nano::process_live_dispatcher::connect (nano::block_processor & block_processor)
{
block_processor.batch_processed.add ([this] (auto const & batch) {
auto const transaction = ledger.tx_begin_read ();
for (auto const & [result, context] : batch)
{
debug_assert (context.block != nullptr);
inspect (result, *context.block, transaction);
}
});
}
void nano::process_live_dispatcher::inspect (nano::block_status const & result, nano::block const & block, secure::transaction const & transaction)
{
switch (result)
{
case nano::block_status::progress:
process_live (block, transaction);
break;
default:
break;
}
}
void nano::process_live_dispatcher::process_live (nano::block const & block, secure::transaction const & transaction)
{
if (websocket.server && websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
{
websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (block));
}
}

View file

@ -5,11 +5,11 @@
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/scheduler/priority.hpp>
nano::scheduler::component::component (nano::node_config & node_config, nano::node & node, nano::ledger & ledger, nano::bucketing & bucketing, nano::block_processor & block_processor, nano::active_elections & active, nano::online_reps & online_reps, nano::vote_cache & vote_cache, nano::confirming_set & confirming_set, nano::stats & stats, nano::logger & logger) :
nano::scheduler::component::component (nano::node_config & node_config, nano::node & node, nano::ledger & ledger, nano::ledger_notifications & ledger_notifications, nano::bucketing & bucketing, nano::active_elections & active, nano::online_reps & online_reps, nano::vote_cache & vote_cache, nano::confirming_set & confirming_set, nano::stats & stats, nano::logger & logger) :
hinted_impl{ std::make_unique<nano::scheduler::hinted> (node_config.hinted_scheduler, node, vote_cache, active, online_reps, stats) },
manual_impl{ std::make_unique<nano::scheduler::manual> (node) },
optimistic_impl{ std::make_unique<nano::scheduler::optimistic> (node_config.optimistic_scheduler, node, ledger, active, node_config.network_params.network, stats) },
priority_impl{ std::make_unique<nano::scheduler::priority> (node_config, node, ledger, bucketing, block_processor, active, confirming_set, stats, logger) },
priority_impl{ std::make_unique<nano::scheduler::priority> (node_config, node, ledger, ledger_notifications, bucketing, active, confirming_set, stats, logger) },
hinted{ *hinted_impl },
manual{ *manual_impl },
optimistic{ *optimistic_impl },

View file

@ -10,7 +10,7 @@ namespace nano::scheduler
class component final
{
public:
component (nano::node_config &, nano::node &, nano::ledger &, nano::bucketing &, nano::block_processor &, nano::active_elections &, nano::online_reps &, nano::vote_cache &, nano::confirming_set &, nano::stats &, nano::logger &);
component (nano::node_config &, nano::node &, nano::ledger &, nano::ledger_notifications &, nano::bucketing &, nano::active_elections &, nano::online_reps &, nano::vote_cache &, nano::confirming_set &, nano::stats &, nano::logger &);
~component ();
void start ();

View file

@ -2,18 +2,19 @@
#include <nano/node/active_elections.hpp>
#include <nano/node/bucketing.hpp>
#include <nano/node/election.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/node.hpp>
#include <nano/node/scheduler/priority.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/ledger_set_any.hpp>
#include <nano/secure/ledger_set_confirmed.hpp>
nano::scheduler::priority::priority (nano::node_config & node_config, nano::node & node_a, nano::ledger & ledger_a, nano::bucketing & bucketing_a, nano::block_processor & block_processor_a, nano::active_elections & active_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) :
nano::scheduler::priority::priority (nano::node_config & node_config, nano::node & node_a, nano::ledger & ledger_a, nano::ledger_notifications & ledger_notifications_a, nano::bucketing & bucketing_a, nano::active_elections & active_a, nano::confirming_set & confirming_set_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ node_config.priority_scheduler },
node{ node_a },
ledger{ ledger_a },
ledger_notifications{ ledger_notifications_a },
bucketing{ bucketing_a },
block_processor{ block_processor_a },
active{ active_a },
confirming_set{ confirming_set_a },
stats{ stats_a },
@ -25,7 +26,7 @@ nano::scheduler::priority::priority (nano::node_config & node_config, nano::node
}
// Activate accounts with fresh blocks
block_processor.batch_processed.add ([this] (auto const & batch) {
ledger_notifications.blocks_processed.add ([this] (auto const & batch) {
auto transaction = ledger.tx_begin_read ();
for (auto const & [result, context] : batch)
{

View file

@ -27,7 +27,7 @@ public:
class priority final
{
public:
priority (nano::node_config &, nano::node &, nano::ledger &, nano::bucketing &, nano::block_processor &, nano::active_elections &, nano::confirming_set &, nano::stats &, nano::logger &);
priority (nano::node_config &, nano::node &, nano::ledger &, nano::ledger_notifications &, nano::bucketing &, nano::active_elections &, nano::confirming_set &, nano::stats &, nano::logger &);
~priority ();
void start ();
@ -52,8 +52,8 @@ private: // Dependencies
priority_config const & config;
nano::node & node;
nano::ledger & ledger;
nano::ledger_notifications & ledger_notifications;
nano::bucketing & bucketing;
nano::block_processor & block_processor;
nano::active_elections & active;
nano::confirming_set & confirming_set;
nano::stats & stats;

View file

@ -3,6 +3,7 @@
#include <nano/store/transaction.hpp>
#include <nano/store/write_queue.hpp>
#include <future>
#include <utility>
namespace nano::secure
@ -38,15 +39,29 @@ class write_transaction final : public transaction
nano::store::write_transaction txn;
std::chrono::steady_clock::time_point start;
// Future to signal transaction got committed
std::promise<void> promise;
std::shared_future<void> future{ promise.get_future () };
public:
explicit write_transaction (nano::store::write_transaction && txn_a, nano::store::write_guard && guard_a) noexcept :
write_transaction (nano::store::write_transaction && txn_a, nano::store::write_guard && guard_a) noexcept :
guard{ std::move (guard_a) },
txn{ std::move (txn_a) }
txn{ std::move (txn_a) },
start{ std::chrono::steady_clock::now () }
{
debug_assert (guard.is_owned ());
start = std::chrono::steady_clock::now ();
debug_assert (active ());
}
~write_transaction () override
{
if (active ())
{
commit ();
}
}
write_transaction (write_transaction && other) = default;
// Override to return a reference to the encapsulated write_transaction
const nano::store::transaction & base_txn () const override
{
@ -57,6 +72,7 @@ public:
{
txn.commit ();
guard.release ();
promise.set_value ();
}
void renew ()
@ -64,6 +80,8 @@ public:
guard.renew ();
txn.renew ();
start = std::chrono::steady_clock::now ();
promise = {};
future = { promise.get_future () };
}
void refresh ()
@ -88,6 +106,16 @@ public:
return txn.timestamp ();
}
bool active () const
{
return guard.is_owned ();
}
std::shared_future<void> get_future ()
{
return future; // Give a copy of the shared future
}
// Conversion operator to const nano::store::transaction&
operator const nano::store::transaction & () const override
{

View file

@ -6,6 +6,7 @@
#include <nano/node/active_elections.hpp>
#include <nano/node/confirming_set.hpp>
#include <nano/node/election.hpp>
#include <nano/node/ledger_notifications.hpp>
#include <nano/node/make_store.hpp>
#include <nano/node/online_reps.hpp>
#include <nano/node/scheduler/component.hpp>
@ -1144,9 +1145,9 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
nano::node_config node_config;
nano::unchecked_map unchecked{ 0, stats, false };
nano::block_processor block_processor{ node_config, ledger, unchecked, stats, logger };
nano::ledger_notifications ledger_notifications{ node_config, stats, logger };
nano::confirming_set_config confirming_set_config{};
nano::confirming_set confirming_set{ confirming_set_config, ledger, block_processor, stats, logger };
nano::confirming_set confirming_set{ confirming_set_config, ledger, ledger_notifications, stats, logger };
auto const num_accounts = 100000;

View file

@ -485,26 +485,32 @@ void nano::test::system::generate_activity (nano::node & node_a, std::vector<nan
auto what (random_pool::generate_byte ());
if (what < 0x1)
{
logger.debug (nano::log::type::test, "Random activity: rollback");
generate_rollback (node_a, accounts_a);
}
else if (what < 0x10)
{
logger.debug (nano::log::type::test, "Random activity: change known");
generate_change_known (node_a, accounts_a);
}
else if (what < 0x20)
{
logger.debug (nano::log::type::test, "Random activity: change unknown");
generate_change_unknown (node_a, accounts_a);
}
else if (what < 0x70)
{
logger.debug (nano::log::type::test, "Random activity: receive");
generate_receive (node_a);
}
else if (what < 0xc0)
{
logger.debug (nano::log::type::test, "Random activity: send existing");
generate_send_existing (node_a, accounts_a);
}
else
{
logger.debug (nano::log::type::test, "Random activity: send new");
generate_send_new (node_a, accounts_a);
}
}