Add the process_live_dipatcher class (#4181)
* Add the process_live_dispatcher class * Move from block_processor::process_live to process_live_dispatcher class * Remove block_post_events class
This commit is contained in:
parent
4b4c271d3c
commit
30046491e2
7 changed files with 100 additions and 54 deletions
|
@ -160,6 +160,8 @@ add_library(
|
|||
portmapping.cpp
|
||||
prioritization.cpp
|
||||
prioritization.hpp
|
||||
process_live_dispatcher.cpp
|
||||
process_live_dispatcher.hpp
|
||||
repcrawler.hpp
|
||||
repcrawler.cpp
|
||||
request_aggregator.hpp
|
||||
|
|
|
@ -2,28 +2,12 @@
|
|||
#include <nano/lib/timer.hpp>
|
||||
#include <nano/node/blockprocessor.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/websocket.hpp>
|
||||
#include <nano/secure/store.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
||||
std::chrono::milliseconds constexpr nano::block_processor::confirmation_request_delay;
|
||||
|
||||
nano::block_post_events::block_post_events (std::function<nano::read_transaction ()> && get_transaction_a) :
|
||||
get_transaction (std::move (get_transaction_a))
|
||||
{
|
||||
}
|
||||
|
||||
nano::block_post_events::~block_post_events ()
|
||||
{
|
||||
debug_assert (get_transaction != nullptr);
|
||||
auto transaction (get_transaction ());
|
||||
for (auto const & i : events)
|
||||
{
|
||||
i (transaction);
|
||||
}
|
||||
}
|
||||
|
||||
nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) :
|
||||
next_log (std::chrono::steady_clock::now ()),
|
||||
node (node_a),
|
||||
|
@ -238,7 +222,6 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
|
|||
{
|
||||
std::deque<processed_t> processed;
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
|
||||
block_post_events post_events ([&store = node.store] { return store.tx_begin_read (); });
|
||||
auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending, tables::unchecked }));
|
||||
nano::timer<std::chrono::milliseconds> timer_l;
|
||||
lock_a.lock ();
|
||||
|
@ -305,7 +288,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
|
|||
}
|
||||
}
|
||||
number_of_blocks_processed++;
|
||||
auto result = process_one (transaction, post_events, block, force);
|
||||
auto result = process_one (transaction, block, force);
|
||||
processed.emplace_back (result, block);
|
||||
lock_a.lock ();
|
||||
}
|
||||
|
@ -318,25 +301,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
|
|||
return processed;
|
||||
}
|
||||
|
||||
void nano::block_processor::process_live (nano::transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a)
|
||||
{
|
||||
// Start collecting quorum on block
|
||||
if (node.ledger.dependents_confirmed (transaction_a, *block_a))
|
||||
{
|
||||
auto account = block_a->account ().is_zero () ? block_a->sideband ().account : block_a->account ();
|
||||
node.scheduler.activate (account, transaction_a);
|
||||
}
|
||||
|
||||
// Notify inactive vote cache about a new live block
|
||||
node.inactive_vote_cache.trigger (block_a->hash ());
|
||||
|
||||
if (node.websocket.server && node.websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
|
||||
{
|
||||
node.websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a));
|
||||
}
|
||||
}
|
||||
|
||||
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, std::shared_ptr<nano::block> block, bool const forced_a)
|
||||
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, std::shared_ptr<nano::block> block, bool const forced_a)
|
||||
{
|
||||
nano::process_return result;
|
||||
auto hash (block->hash ());
|
||||
|
@ -351,9 +316,6 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
|
|||
block->serialize_json (block_string, node.config.logging.single_line_record ());
|
||||
node.logger.try_log (boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block_string));
|
||||
}
|
||||
events_a.events.emplace_back ([this, block] (nano::transaction const & post_event_transaction_a) {
|
||||
process_live (post_event_transaction_a, block);
|
||||
});
|
||||
queue_unchecked (transaction_a, hash);
|
||||
/* For send blocks check epoch open unchecked (gap pending).
|
||||
For state blocks check only send subtype and only if block epoch is not last epoch.
|
||||
|
|
|
@ -18,17 +18,6 @@ class transaction;
|
|||
class write_transaction;
|
||||
class write_database_queue;
|
||||
|
||||
class block_post_events final
|
||||
{
|
||||
public:
|
||||
explicit block_post_events (std::function<nano::read_transaction ()> &&);
|
||||
~block_post_events ();
|
||||
std::deque<std::function<void (nano::read_transaction const &)>> events;
|
||||
|
||||
private:
|
||||
std::function<nano::read_transaction ()> get_transaction;
|
||||
};
|
||||
|
||||
/**
|
||||
* Processing blocks is a potentially long IO operation.
|
||||
* This class isolates block insertion from other operations like servicing network operations
|
||||
|
@ -65,10 +54,9 @@ private:
|
|||
blocking_observer blocking;
|
||||
|
||||
private:
|
||||
nano::process_return process_one (nano::write_transaction const &, block_post_events &, std::shared_ptr<nano::block> block, bool const = false);
|
||||
nano::process_return process_one (nano::write_transaction const &, std::shared_ptr<nano::block> block, bool const = false);
|
||||
void queue_unchecked (nano::write_transaction const &, nano::hash_or_account const &);
|
||||
std::deque<processed_t> process_batch (nano::unique_lock<nano::mutex> &);
|
||||
void process_live (nano::transaction const &, std::shared_ptr<nano::block> const &);
|
||||
void process_verified_state_blocks (std::deque<nano::state_block_signature_verification::value_type> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &);
|
||||
void add_impl (std::shared_ptr<nano::block> block);
|
||||
bool stopped{ false };
|
||||
|
|
|
@ -207,11 +207,13 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
|
|||
node_seq (seq),
|
||||
block_broadcast{ network, block_arrival, !flags.disable_block_processor_republishing },
|
||||
block_publisher{ active },
|
||||
gap_tracker{ gap_cache }
|
||||
gap_tracker{ gap_cache },
|
||||
process_live_dispatcher{ ledger, scheduler, inactive_vote_cache, websocket }
|
||||
{
|
||||
block_broadcast.connect (block_processor);
|
||||
block_publisher.connect (block_processor);
|
||||
gap_tracker.connect (block_processor);
|
||||
process_live_dispatcher.connect (block_processor);
|
||||
unchecked.use_memory = [this] () { return ledger.bootstrap_weight_reached (); };
|
||||
unchecked.satisfied = [this] (nano::unchecked_info const & info) {
|
||||
this->block_processor.add (info.block);
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include <nano/node/online_reps.hpp>
|
||||
#include <nano/node/optimistic_scheduler.hpp>
|
||||
#include <nano/node/portmapping.hpp>
|
||||
#include <nano/node/process_live_dispatcher.hpp>
|
||||
#include <nano/node/repcrawler.hpp>
|
||||
#include <nano/node/request_aggregator.hpp>
|
||||
#include <nano/node/signatures.hpp>
|
||||
|
@ -196,6 +197,7 @@ public:
|
|||
nano::block_broadcast block_broadcast;
|
||||
nano::block_publisher block_publisher;
|
||||
nano::gap_tracker gap_tracker;
|
||||
nano::process_live_dispatcher process_live_dispatcher;
|
||||
|
||||
std::chrono::steady_clock::time_point const startup_time;
|
||||
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week
|
||||
|
|
59
nano/node/process_live_dispatcher.cpp
Normal file
59
nano/node/process_live_dispatcher.cpp
Normal file
|
@ -0,0 +1,59 @@
|
|||
#include <nano/lib/blocks.hpp>
|
||||
#include <nano/node/blockprocessor.hpp>
|
||||
#include <nano/node/election_scheduler.hpp>
|
||||
#include <nano/node/process_live_dispatcher.hpp>
|
||||
#include <nano/node/vote_cache.hpp>
|
||||
#include <nano/node/websocket.hpp>
|
||||
#include <nano/secure/common.hpp>
|
||||
#include <nano/secure/ledger.hpp>
|
||||
#include <nano/secure/store.hpp>
|
||||
|
||||
nano::process_live_dispatcher::process_live_dispatcher (nano::ledger & ledger, nano::election_scheduler & scheduler, nano::vote_cache & inactive_vote_cache, nano::websocket_server & websocket) :
|
||||
ledger{ ledger },
|
||||
scheduler{ scheduler },
|
||||
inactive_vote_cache{ inactive_vote_cache },
|
||||
websocket{ websocket }
|
||||
{
|
||||
}
|
||||
|
||||
void nano::process_live_dispatcher::connect (nano::block_processor & block_processor)
|
||||
{
|
||||
block_processor.batch_processed.add ([this] (auto const & batch) {
|
||||
auto const transaction = ledger.store.tx_begin_read ();
|
||||
for (auto const & [result, block] : batch)
|
||||
{
|
||||
debug_assert (block != nullptr);
|
||||
inspect (result, *block, transaction);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void nano::process_live_dispatcher::inspect (nano::process_return const & result, nano::block const & block, nano::transaction const & transaction)
|
||||
{
|
||||
switch (result.code)
|
||||
{
|
||||
case nano::process_result::progress:
|
||||
process_live (block, transaction);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void nano::process_live_dispatcher::process_live (nano::block const & block, nano::transaction const & transaction)
|
||||
{
|
||||
// Start collecting quorum on block
|
||||
if (ledger.dependents_confirmed (transaction, block))
|
||||
{
|
||||
auto account = block.account ().is_zero () ? block.sideband ().account : block.account ();
|
||||
scheduler.activate (account, transaction);
|
||||
}
|
||||
|
||||
// Notify inactive vote cache about a new live block
|
||||
inactive_vote_cache.trigger (block.hash ());
|
||||
|
||||
if (websocket.server && websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
|
||||
{
|
||||
websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (block));
|
||||
}
|
||||
}
|
31
nano/node/process_live_dispatcher.hpp
Normal file
31
nano/node/process_live_dispatcher.hpp
Normal file
|
@ -0,0 +1,31 @@
|
|||
#pragma once
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class ledger;
|
||||
class election_scheduler;
|
||||
class vote_cache;
|
||||
class websocket_server;
|
||||
class block_processor;
|
||||
class process_return;
|
||||
class block;
|
||||
class transaction;
|
||||
|
||||
// Observes confirmed blocks and dispatches the process_live function.
|
||||
class process_live_dispatcher
|
||||
{
|
||||
public:
|
||||
process_live_dispatcher (nano::ledger & ledger, nano::election_scheduler & scheduler, nano::vote_cache & inactive_vote_cache, nano::websocket_server & websocket);
|
||||
void connect (nano::block_processor & block_processor);
|
||||
|
||||
private:
|
||||
// Block_processor observer
|
||||
void inspect (nano::process_return const & result, nano::block const & block, nano::transaction const & transaction);
|
||||
void process_live (nano::block const & block, nano::transaction const & transaction);
|
||||
|
||||
nano::ledger & ledger;
|
||||
nano::election_scheduler & scheduler;
|
||||
nano::vote_cache & inactive_vote_cache;
|
||||
nano::websocket_server & websocket;
|
||||
};
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue