diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index f31a0cdd..9289b863 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -160,6 +160,8 @@ add_library( portmapping.cpp prioritization.cpp prioritization.hpp + process_live_dispatcher.cpp + process_live_dispatcher.hpp repcrawler.hpp repcrawler.cpp request_aggregator.hpp diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 1852278e..1d9c7e6d 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -2,28 +2,12 @@ #include #include #include -#include #include #include std::chrono::milliseconds constexpr nano::block_processor::confirmation_request_delay; -nano::block_post_events::block_post_events (std::function && get_transaction_a) : - get_transaction (std::move (get_transaction_a)) -{ -} - -nano::block_post_events::~block_post_events () -{ - debug_assert (get_transaction != nullptr); - auto transaction (get_transaction ()); - for (auto const & i : events) - { - i (transaction); - } -} - nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) : next_log (std::chrono::steady_clock::now ()), node (node_a), @@ -238,7 +222,6 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock { std::deque processed; auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); - block_post_events post_events ([&store = node.store] { return store.tx_begin_read (); }); auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending, tables::unchecked })); nano::timer timer_l; lock_a.lock (); @@ -305,7 +288,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock } } number_of_blocks_processed++; - auto result = process_one (transaction, post_events, block, force); + auto result = process_one (transaction, block, force); processed.emplace_back (result, block); lock_a.lock (); } @@ -318,25 +301,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock return processed; } -void nano::block_processor::process_live (nano::transaction const & transaction_a, std::shared_ptr const & block_a) -{ - // Start collecting quorum on block - if (node.ledger.dependents_confirmed (transaction_a, *block_a)) - { - auto account = block_a->account ().is_zero () ? block_a->sideband ().account : block_a->account (); - node.scheduler.activate (account, transaction_a); - } - - // Notify inactive vote cache about a new live block - node.inactive_vote_cache.trigger (block_a->hash ()); - - if (node.websocket.server && node.websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block)) - { - node.websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a)); - } -} - -nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, std::shared_ptr block, bool const forced_a) +nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, std::shared_ptr block, bool const forced_a) { nano::process_return result; auto hash (block->hash ()); @@ -351,9 +316,6 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction block->serialize_json (block_string, node.config.logging.single_line_record ()); node.logger.try_log (boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block_string)); } - events_a.events.emplace_back ([this, block] (nano::transaction const & post_event_transaction_a) { - process_live (post_event_transaction_a, block); - }); queue_unchecked (transaction_a, hash); /* For send blocks check epoch open unchecked (gap pending). For state blocks check only send subtype and only if block epoch is not last epoch. diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 233af597..91861051 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -18,17 +18,6 @@ class transaction; class write_transaction; class write_database_queue; -class block_post_events final -{ -public: - explicit block_post_events (std::function &&); - ~block_post_events (); - std::deque> events; - -private: - std::function get_transaction; -}; - /** * Processing blocks is a potentially long IO operation. * This class isolates block insertion from other operations like servicing network operations @@ -65,10 +54,9 @@ private: blocking_observer blocking; private: - nano::process_return process_one (nano::write_transaction const &, block_post_events &, std::shared_ptr block, bool const = false); + nano::process_return process_one (nano::write_transaction const &, std::shared_ptr block, bool const = false); void queue_unchecked (nano::write_transaction const &, nano::hash_or_account const &); std::deque process_batch (nano::unique_lock &); - void process_live (nano::transaction const &, std::shared_ptr const &); void process_verified_state_blocks (std::deque &, std::vector const &, std::vector const &, std::vector const &); void add_impl (std::shared_ptr block); bool stopped{ false }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index d8ef9a2b..ea551db9 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -207,11 +207,13 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co node_seq (seq), block_broadcast{ network, block_arrival, !flags.disable_block_processor_republishing }, block_publisher{ active }, - gap_tracker{ gap_cache } + gap_tracker{ gap_cache }, + process_live_dispatcher{ ledger, scheduler, inactive_vote_cache, websocket } { block_broadcast.connect (block_processor); block_publisher.connect (block_processor); gap_tracker.connect (block_processor); + process_live_dispatcher.connect (block_processor); unchecked.use_memory = [this] () { return ledger.bootstrap_weight_reached (); }; unchecked.satisfied = [this] (nano::unchecked_info const & info) { this->block_processor.add (info.block); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 8d9a0b7b..987dce2e 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -196,6 +197,7 @@ public: nano::block_broadcast block_broadcast; nano::block_publisher block_publisher; nano::gap_tracker gap_tracker; + nano::process_live_dispatcher process_live_dispatcher; std::chrono::steady_clock::time_point const startup_time; std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week diff --git a/nano/node/process_live_dispatcher.cpp b/nano/node/process_live_dispatcher.cpp new file mode 100644 index 00000000..d13720e0 --- /dev/null +++ b/nano/node/process_live_dispatcher.cpp @@ -0,0 +1,59 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +nano::process_live_dispatcher::process_live_dispatcher (nano::ledger & ledger, nano::election_scheduler & scheduler, nano::vote_cache & inactive_vote_cache, nano::websocket_server & websocket) : + ledger{ ledger }, + scheduler{ scheduler }, + inactive_vote_cache{ inactive_vote_cache }, + websocket{ websocket } +{ +} + +void nano::process_live_dispatcher::connect (nano::block_processor & block_processor) +{ + block_processor.batch_processed.add ([this] (auto const & batch) { + auto const transaction = ledger.store.tx_begin_read (); + for (auto const & [result, block] : batch) + { + debug_assert (block != nullptr); + inspect (result, *block, transaction); + } + }); +} + +void nano::process_live_dispatcher::inspect (nano::process_return const & result, nano::block const & block, nano::transaction const & transaction) +{ + switch (result.code) + { + case nano::process_result::progress: + process_live (block, transaction); + break; + default: + break; + } +} + +void nano::process_live_dispatcher::process_live (nano::block const & block, nano::transaction const & transaction) +{ + // Start collecting quorum on block + if (ledger.dependents_confirmed (transaction, block)) + { + auto account = block.account ().is_zero () ? block.sideband ().account : block.account (); + scheduler.activate (account, transaction); + } + + // Notify inactive vote cache about a new live block + inactive_vote_cache.trigger (block.hash ()); + + if (websocket.server && websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block)) + { + websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (block)); + } +} diff --git a/nano/node/process_live_dispatcher.hpp b/nano/node/process_live_dispatcher.hpp new file mode 100644 index 00000000..f8674b9d --- /dev/null +++ b/nano/node/process_live_dispatcher.hpp @@ -0,0 +1,31 @@ +#pragma once + +namespace nano +{ +class ledger; +class election_scheduler; +class vote_cache; +class websocket_server; +class block_processor; +class process_return; +class block; +class transaction; + +// Observes confirmed blocks and dispatches the process_live function. +class process_live_dispatcher +{ +public: + process_live_dispatcher (nano::ledger & ledger, nano::election_scheduler & scheduler, nano::vote_cache & inactive_vote_cache, nano::websocket_server & websocket); + void connect (nano::block_processor & block_processor); + +private: + // Block_processor observer + void inspect (nano::process_return const & result, nano::block const & block, nano::transaction const & transaction); + void process_live (nano::block const & block, nano::transaction const & transaction); + + nano::ledger & ledger; + nano::election_scheduler & scheduler; + nano::vote_cache & inactive_vote_cache; + nano::websocket_server & websocket; +}; +}