Merge pull request #4167 from clemahieu/block_origin_remove

Add batch block processing result observer to block_processor
This commit is contained in:
clemahieu 2023-03-05 01:11:26 +00:00 committed by GitHub
commit b7816bada0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 364 additions and 144 deletions

View file

@ -1064,14 +1064,14 @@ TEST (active_transactions, conflicting_block_vote_existing_election)
.build_shared ();
auto vote_fork (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector<nano::block_hash>{ fork->hash () }));
ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code);
ASSERT_TIMELY_EQ (5s, 1, node.active.size ());
// Vote for conflicting block, but the block does not yet exist in the ledger
node.active.vote (vote_fork);
// Block now gets processed
ASSERT_EQ (nano::process_result::fork, node.process_local (fork).code);
ASSERT_EQ (nano::process_result::fork, node.process_local (fork).value ().code);
// Election must be confirmed
auto election (node.active.election (fork->qualified_root ()));

View file

@ -656,7 +656,7 @@ TEST (node, fork_publish_inactive)
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (5s, election = node.active.election (send1->qualified_root ()));
ASSERT_EQ (nano::process_result::fork, node.process_local (send2).code);
ASSERT_EQ (nano::process_result::fork, node.process_local (send2).value ().code);
auto blocks = election->blocks ();
ASSERT_TIMELY_EQ (5s, blocks.size (), 2);
@ -3436,7 +3436,6 @@ TEST (node, aggressive_flooding)
nano::test::system system;
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
node_flags.disable_block_processor_republishing = true;
node_flags.disable_bootstrap_bulk_push_client = true;
node_flags.disable_bootstrap_bulk_pull_server = true;
node_flags.disable_bootstrap_listener = true;
@ -3521,7 +3520,7 @@ TEST (node, aggressive_flooding)
.build ();
}
// Processing locally goes through the aggressive block flooding path
ASSERT_EQ (nano::process_result::progress, node1.process_local (block).code);
ASSERT_EQ (nano::process_result::progress, node1.process_local (block).value ().code);
auto all_have_block = [&nodes_wallets] (nano::block_hash const & hash_a) {
return std::all_of (nodes_wallets.begin (), nodes_wallets.end (), [hash = hash_a] (auto const & node_wallet) {

View file

@ -198,7 +198,7 @@ TEST (vote_processor, no_broadcast_local)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code);
ASSERT_TIMELY (10s, !node.active.empty ());
ASSERT_EQ (2 * node.config.vote_minimum.number (), node.weight (nano::dev::genesis_key.pub));
// Insert account in wallet. Votes on node are not enabled.
@ -251,7 +251,7 @@ TEST (vote_processor, local_broadcast_without_a_representative)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code);
ASSERT_TIMELY (10s, !node.active.empty ());
ASSERT_EQ (node.config.vote_minimum, node.weight (nano::dev::genesis_key.pub));
node.block_confirm (send);
@ -299,7 +299,7 @@ TEST (vote_processor, no_broadcast_local_with_a_principal_representative)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code);
ASSERT_TIMELY (10s, !node.active.empty ());
ASSERT_EQ (nano::dev::constants.genesis_amount - 2 * node.config.vote_minimum.number (), node.weight (nano::dev::genesis_key.pub));
// Insert account in wallet. Votes on node are not enabled.

View file

@ -1062,7 +1062,7 @@ TEST (websocket, new_unconfirmed_block)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();
ASSERT_EQ (nano::process_result::progress, node1->process_local (send1).code);
ASSERT_EQ (nano::process_result::progress, node1->process_local (send1).value ().code);
ASSERT_TIMELY (5s, future.wait_for (0s) == std::future_status::ready);

View file

@ -223,6 +223,8 @@ std::string nano::error_rpc_messages::message (int ev) const
return "Signing by block hash is disabled";
case nano::error_rpc::source_not_found:
return "Source not found";
case nano::error_rpc::stopped:
return "Stopped";
}
return "Invalid error code";
@ -503,4 +505,4 @@ std::error_code make_error_code (boost::system::errc::errc_t const & e)
return std::error_code (static_cast<int> (e), ::nano::error_conversion::generic_category ());
}
}
#endif
#endif

View file

@ -123,7 +123,8 @@ enum class error_rpc
requires_port_and_address,
rpc_control_disabled,
sign_hash_disabled,
source_not_found
source_not_found,
stopped
};
/** process_result related errors */

View file

@ -22,6 +22,10 @@ add_library(
bandwidth_limiter.cpp
block_arrival.hpp
block_arrival.cpp
block_broadcast.cpp
block_broadcast.hpp
blocking_observer.cpp
blocking_observer.hpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp

View file

@ -0,0 +1,68 @@
#include <nano/node/block_arrival.hpp>
#include <nano/node/block_broadcast.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/network.hpp>
nano::block_broadcast::block_broadcast (nano::network & network, nano::block_arrival & block_arrival) :
network{ network },
block_arrival{ block_arrival }
{
}
void nano::block_broadcast::connect (nano::block_processor & block_processor, bool enabled)
{
if (!enabled)
{
return;
}
block_processor.processed.add ([this] (auto const & result, auto const & block) {
switch (result.code)
{
case nano::process_result::progress:
observe (block);
break;
default:
break;
}
erase (block);
});
}
void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto existing = local.find (block);
auto local_l = existing != local.end ();
lock.unlock ();
if (local_l)
{
// Block created on this node
// Perform more agressive initial flooding
network.flood_block_initial (block);
}
else
{
if (block_arrival.recent (block->hash ()))
{
// Block arrived from realtime traffic, do normal gossip.
network.flood_block (block, nano::transport::buffer_drop_policy::limiter);
}
else
{
// Block arrived from bootstrap
// Don't broadcast blocks we're bootstrapping
}
}
}
void nano::block_broadcast::set_local (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
local.insert (block);
}
void nano::block_broadcast::erase (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
local.erase (block);
}

View file

@ -0,0 +1,32 @@
#pragma once
#include <nano/lib/blocks.hpp>
#include <memory>
#include <unordered_set>
namespace nano
{
class block_arrival;
class block_processor;
class network;
// This class tracks blocks that originated from this node.
class block_broadcast
{
public:
block_broadcast (nano::network & network, nano::block_arrival & block_arrival);
// Add batch_processed observer to block_processor if enabled
void connect (nano::block_processor & block_processor, bool enabled);
// Block_processor observer
void observe (std::shared_ptr<nano::block> block);
// Mark a block as originating locally
void set_local (std::shared_ptr<nano::block> block);
void erase (std::shared_ptr<nano::block> block);
private:
nano::network & network;
nano::block_arrival & block_arrival;
std::unordered_set<std::shared_ptr<nano::block>> local; // Blocks originated on this node
nano::mutex mutex;
};
}

View file

@ -0,0 +1,52 @@
#include <nano/node/blocking_observer.hpp>
#include <nano/node/blockprocessor.hpp>
void nano::blocking_observer::connect (nano::block_processor & block_processor)
{
block_processor.processed.add ([this] (auto const & result, auto const & block) {
observe (result, block);
});
}
void nano::blocking_observer::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;
auto discard = std::move (blocking);
// Signal broken promises outside lock
lock.unlock ();
discard.clear (); // ~promise future_error
}
void nano::blocking_observer::observe (nano::process_return const & result, std::shared_ptr<nano::block> block)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto existing = blocking.find (block);
if (existing != blocking.end ())
{
auto promise = std::move (existing->second);
blocking.erase (existing);
// Signal promise outside of lock
lock.unlock ();
promise.set_value (result);
}
}
std::future<nano::process_return> nano::blocking_observer::insert (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (stopped)
{
std::promise<nano::process_return> promise;
return promise.get_future (); // ~promise future_error
}
auto iterator = blocking.emplace (block, std::promise<nano::process_return>{});
return iterator->second.get_future ();
}
bool nano::blocking_observer::exists (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto existing = blocking.find (block);
return existing != blocking.end ();
}

View file

@ -0,0 +1,31 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/secure/common.hpp>
#include <future>
#include <memory>
#include <unordered_map>
namespace nano
{
class block;
class block_processor;
// Observer that facilitates a blocking call to block processing which is done asynchronosly by the block_processor
class blocking_observer
{
public:
void connect (nano::block_processor & block_processor);
// Stop the observer and trigger broken promise exceptions
void stop ();
// Block processor observer
void observe (nano::process_return const & result, std::shared_ptr<nano::block> block);
[[nodiscard]] std::future<nano::process_return> insert (std::shared_ptr<nano::block> block);
bool exists (std::shared_ptr<nano::block> block);
private:
std::unordered_multimap<std::shared_ptr<nano::block>, std::promise<nano::process_return>> blocking;
bool stopped{ false };
nano::mutex mutex;
};
}

View file

@ -31,6 +31,15 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas
write_database_queue (write_database_queue_a),
state_block_signature_verification (node.checker, node.ledger.constants.epochs, node.config, node.logger, node.flags.block_processor_verification_size)
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
for (auto const & item : items)
{
auto const & [result, block] = item;
processed.notify (result, block);
}
});
blocking.connect (*this);
state_block_signature_verification.blocks_verified_callback = [this] (std::deque<nano::state_block_signature_verification::value_type> & items, std::vector<int> const & verifications, std::vector<nano::block_hash> const & hashes, std::vector<nano::signature> const & blocks_signatures) {
this->process_verified_state_blocks (items, verifications, hashes, blocks_signatures);
};
@ -57,6 +66,7 @@ void nano::block_processor::stop ()
stopped = true;
}
condition.notify_all ();
blocking.stop ();
state_block_signature_verification.stop ();
nano::join_or_pass (processing_thread);
}
@ -101,18 +111,24 @@ void nano::block_processor::add (std::shared_ptr<nano::block> const & block)
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work);
return;
}
if (block->type () == nano::block_type::state || block->type () == nano::block_type::open)
add_impl (block);
return;
}
std::optional<nano::process_return> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block)
{
auto future = blocking.insert (block);
add_impl (block);
condition.notify_all ();
std::optional<nano::process_return> result;
try
{
state_block_signature_verification.add ({ block });
result = future.get ();
}
else
catch (std::future_error const & e)
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (block);
}
condition.notify_all ();
}
return result;
}
void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
@ -124,12 +140,6 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
condition.notify_all ();
}
void nano::block_processor::wait_write ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
awaiting_write = true;
}
void nano::block_processor::process_blocks ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
@ -139,7 +149,8 @@ void nano::block_processor::process_blocks ()
{
active = true;
lock.unlock ();
process_batch (lock);
auto processed = process_batch (lock);
batch_processed.notify (processed);
lock.lock ();
active = false;
}
@ -208,8 +219,25 @@ void nano::block_processor::process_verified_state_blocks (std::deque<nano::stat
condition.notify_all ();
}
void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a)
void nano::block_processor::add_impl (std::shared_ptr<nano::block> block)
{
if (block->type () == nano::block_type::state || block->type () == nano::block_type::open)
{
state_block_signature_verification.add ({ block });
}
else
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (block);
}
condition.notify_all ();
}
}
auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a) -> std::deque<processed_t>
{
std::deque<processed_t> processed;
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
block_post_events post_events ([&store = node.store] { return store.tx_begin_read (); });
auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending, tables::unchecked }));
@ -221,7 +249,7 @@ void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !awaiting_write && !store_batch_reached ())
while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
{
if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ())
{
@ -278,19 +306,20 @@ void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
}
}
number_of_blocks_processed++;
process_one (transaction, post_events, block, force);
auto result = process_one (transaction, post_events, block, force);
processed.emplace_back (result, block);
lock_a.lock ();
}
awaiting_write = false;
lock_a.unlock ();
if (node.config.logging.timing_logging () && number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100))
{
node.logger.always_log (boost::str (boost::format ("Processed %1% blocks (%2% blocks were forced) in %3% %4%") % number_of_blocks_processed % number_of_forced_processed % timer_l.value ().count () % timer_l.unit ()));
}
return processed;
}
void nano::block_processor::process_live (nano::transaction const & transaction_a, nano::block_hash const & hash_a, std::shared_ptr<nano::block> const & block_a, nano::process_return const & process_return_a, nano::block_origin const origin_a)
void nano::block_processor::process_live (nano::transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a)
{
// Start collecting quorum on block
if (node.ledger.dependents_confirmed (transaction_a, *block_a))
@ -302,30 +331,17 @@ void nano::block_processor::process_live (nano::transaction const & transaction_
// Notify inactive vote cache about a new live block
node.inactive_vote_cache.trigger (block_a->hash ());
// Announce block contents to the network
if (origin_a == nano::block_origin::local)
{
node.network.flood_block_initial (block_a);
}
else if (!node.flags.disable_block_processor_republishing && node.block_arrival.recent (hash_a))
{
node.network.flood_block (block_a, nano::transport::buffer_drop_policy::limiter);
}
if (node.websocket.server && node.websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
{
node.websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a));
}
}
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, std::shared_ptr<nano::block> block, bool const forced_a, nano::block_origin const origin_a)
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, std::shared_ptr<nano::block> block, bool const forced_a)
{
nano::process_return result;
auto hash (block->hash ());
result = node.ledger.process (transaction_a, *block);
events_a.events.emplace_back ([this, result, block] (nano::transaction const & tx) {
processed.notify (tx, result, *block);
});
switch (result.code)
{
case nano::process_result::progress:
@ -336,8 +352,8 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
block->serialize_json (block_string, node.config.logging.single_line_record ());
node.logger.try_log (boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block_string));
}
events_a.events.emplace_back ([this, hash, block, result, origin_a] (nano::transaction const & post_event_transaction_a) {
process_live (post_event_transaction_a, hash, block, result, origin_a);
events_a.events.emplace_back ([this, block] (nano::transaction const & post_event_transaction_a) {
process_live (post_event_transaction_a, block);
});
queue_unchecked (transaction_a, hash);
/* For send blocks check epoch open unchecked (gap pending).

View file

@ -1,10 +1,12 @@
#pragma once
#include <nano/lib/blocks.hpp>
#include <nano/node/blocking_observer.hpp>
#include <nano/node/state_block_signature_verification.hpp>
#include <nano/secure/common.hpp>
#include <chrono>
#include <future>
#include <memory>
#include <thread>
@ -16,12 +18,6 @@ class transaction;
class write_transaction;
class write_database_queue;
enum class block_origin
{
local,
remote
};
class block_post_events final
{
public:
@ -47,27 +43,36 @@ public:
bool full ();
bool half_full ();
void add (std::shared_ptr<nano::block> const &);
std::optional<nano::process_return> add_blocking (std::shared_ptr<nano::block> const & block);
void force (std::shared_ptr<nano::block> const &);
void wait_write ();
bool should_log ();
bool have_blocks_ready ();
bool have_blocks ();
void process_blocks ();
nano::process_return process_one (nano::write_transaction const &, block_post_events &, std::shared_ptr<nano::block> block, bool const = false, nano::block_origin const = nano::block_origin::remote);
std::atomic<bool> flushing{ false };
// Delay required for average network propagartion before requesting confirmation
static std::chrono::milliseconds constexpr confirmation_request_delay{ 1500 };
nano::observer_set<nano::transaction const &, nano::process_return const &, nano::block const &> processed;
public: // Events
using processed_t = std::pair<nano::process_return, std::shared_ptr<nano::block>>;
nano::observer_set<nano::process_return const &, std::shared_ptr<nano::block>> processed;
// The batch observer feeds the processed obsever
nano::observer_set<std::deque<processed_t> const &> batch_processed;
private:
blocking_observer blocking;
private:
nano::process_return process_one (nano::write_transaction const &, block_post_events &, std::shared_ptr<nano::block> block, bool const = false);
void queue_unchecked (nano::write_transaction const &, nano::hash_or_account const &);
void process_batch (nano::unique_lock<nano::mutex> &);
void process_live (nano::transaction const &, nano::block_hash const &, std::shared_ptr<nano::block> const &, nano::process_return const &, nano::block_origin const = nano::block_origin::remote);
std::deque<processed_t> process_batch (nano::unique_lock<nano::mutex> &);
void process_live (nano::transaction const &, std::shared_ptr<nano::block> const &);
void process_verified_state_blocks (std::deque<nano::state_block_signature_verification::value_type> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &);
void add_impl (std::shared_ptr<nano::block> block);
bool stopped{ false };
bool active{ false };
bool awaiting_write{ false };
std::chrono::steady_clock::time_point next_log;
std::deque<std::shared_ptr<nano::block>> blocks;
std::deque<std::shared_ptr<nano::block>> forced;

View file

@ -50,7 +50,7 @@ void nano::epoch_upgrader::upgrade_impl (nano::raw_key const & prv_a, nano::epoc
nano::process_result result (nano::process_result::old);
if (valid_signature && valid_work)
{
result = node.process_local (epoch).code;
result = node.process_local (epoch).value ().code;
}
if (result == nano::process_result::progress)
{

View file

@ -3237,87 +3237,95 @@ void nano::json_handler::process ()
{
if (!is_async)
{
auto result (rpc_l->node.process_local (block));
switch (result.code)
auto result_maybe = rpc_l->node.process_local (block);
if (!result_maybe)
{
case nano::process_result::progress:
rpc_l->ec = nano::error_rpc::stopped;
}
else
{
auto const & result = result_maybe.value ();
switch (result.code)
{
rpc_l->response_l.put ("hash", block->hash ().to_string ());
break;
}
case nano::process_result::gap_previous:
{
rpc_l->ec = nano::error_process::gap_previous;
break;
}
case nano::process_result::gap_source:
{
rpc_l->ec = nano::error_process::gap_source;
break;
}
case nano::process_result::old:
{
rpc_l->ec = nano::error_process::old;
break;
}
case nano::process_result::bad_signature:
{
rpc_l->ec = nano::error_process::bad_signature;
break;
}
case nano::process_result::negative_spend:
{
// TODO once we get RPC versioning, this should be changed to "negative spend"
rpc_l->ec = nano::error_process::negative_spend;
break;
}
case nano::process_result::balance_mismatch:
{
rpc_l->ec = nano::error_process::balance_mismatch;
break;
}
case nano::process_result::unreceivable:
{
rpc_l->ec = nano::error_process::unreceivable;
break;
}
case nano::process_result::block_position:
{
rpc_l->ec = nano::error_process::block_position;
break;
}
case nano::process_result::gap_epoch_open_pending:
{
rpc_l->ec = nano::error_process::gap_epoch_open_pending;
break;
}
case nano::process_result::fork:
{
bool const force = rpc_l->request.get<bool> ("force", false);
if (force)
case nano::process_result::progress:
{
rpc_l->node.active.erase (*block);
rpc_l->node.block_processor.force (block);
rpc_l->response_l.put ("hash", block->hash ().to_string ());
break;
}
else
case nano::process_result::gap_previous:
{
rpc_l->ec = nano::error_process::fork;
rpc_l->ec = nano::error_process::gap_previous;
break;
}
case nano::process_result::gap_source:
{
rpc_l->ec = nano::error_process::gap_source;
break;
}
case nano::process_result::old:
{
rpc_l->ec = nano::error_process::old;
break;
}
case nano::process_result::bad_signature:
{
rpc_l->ec = nano::error_process::bad_signature;
break;
}
case nano::process_result::negative_spend:
{
// TODO once we get RPC versioning, this should be changed to "negative spend"
rpc_l->ec = nano::error_process::negative_spend;
break;
}
case nano::process_result::balance_mismatch:
{
rpc_l->ec = nano::error_process::balance_mismatch;
break;
}
case nano::process_result::unreceivable:
{
rpc_l->ec = nano::error_process::unreceivable;
break;
}
case nano::process_result::block_position:
{
rpc_l->ec = nano::error_process::block_position;
break;
}
case nano::process_result::gap_epoch_open_pending:
{
rpc_l->ec = nano::error_process::gap_epoch_open_pending;
break;
}
case nano::process_result::fork:
{
bool const force = rpc_l->request.get<bool> ("force", false);
if (force)
{
rpc_l->node.active.erase (*block);
rpc_l->node.block_processor.force (block);
rpc_l->response_l.put ("hash", block->hash ().to_string ());
}
else
{
rpc_l->ec = nano::error_process::fork;
}
break;
}
case nano::process_result::insufficient_work:
{
rpc_l->ec = nano::error_process::insufficient_work;
break;
}
case nano::process_result::opened_burn_account:
rpc_l->ec = nano::error_process::opened_burn_account;
break;
default:
{
rpc_l->ec = nano::error_process::other;
break;
}
break;
}
case nano::process_result::insufficient_work:
{
rpc_l->ec = nano::error_process::insufficient_work;
break;
}
case nano::process_result::opened_burn_account:
rpc_l->ec = nano::error_process::opened_burn_account;
break;
default:
{
rpc_l->ec = nano::error_process::other;
break;
}
}
}

View file

@ -204,8 +204,10 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger },
epoch_upgrader{ *this, ledger, store, network_params, logger },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq)
node_seq (seq),
block_broadcast{ network, block_arrival }
{
block_broadcast.connect (block_processor, !flags.disable_block_processor_republishing);
unchecked.use_memory = [this] () { return ledger.bootstrap_weight_reached (); };
unchecked.satisfied = [this] (nano::unchecked_info const & info) {
this->block_processor.add (info.block);
@ -597,16 +599,12 @@ nano::process_return nano::node::process (nano::block & block)
return process (transaction, block);
}
nano::process_return nano::node::process_local (std::shared_ptr<nano::block> const & block_a)
std::optional<nano::process_return> nano::node::process_local (std::shared_ptr<nano::block> const & block_a)
{
// Add block hash as recently arrived to trigger automatic rebroadcast and election
block_arrival.add (block_a->hash ());
// Notify block processor to release write lock
block_processor.wait_write ();
// Process block
block_post_events post_events ([&store = store] { return store.tx_begin_read (); });
auto const transaction (store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending }));
return block_processor.process_one (transaction, post_events, block_a, false, nano::block_origin::local);
block_broadcast.set_local (block_a);
return block_processor.add_blocking (block_a);
}
void nano::node::process_local_async (std::shared_ptr<nano::block> const & block_a)

View file

@ -7,6 +7,7 @@
#include <nano/node/backlog_population.hpp>
#include <nano/node/bandwidth_limiter.hpp>
#include <nano/node/block_arrival.hpp>
#include <nano/node/block_broadcast.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
@ -84,7 +85,7 @@ public:
void process_confirmed_data (nano::transaction const &, std::shared_ptr<nano::block> const &, nano::block_hash const &, nano::account &, nano::uint128_t &, bool &, bool &, nano::account &);
void process_confirmed (nano::election_status const &, uint64_t = 0);
void process_active (std::shared_ptr<nano::block> const &);
nano::process_return process_local (std::shared_ptr<nano::block> const &);
std::optional<nano::process_return> process_local (std::shared_ptr<nano::block> const &);
void process_local_async (std::shared_ptr<nano::block> const &);
void keepalive_preconfigured (std::vector<std::string> const &);
std::shared_ptr<nano::block> block (nano::block_hash const &);
@ -190,6 +191,7 @@ public:
nano::backlog_population backlog;
nano::websocket_server websocket;
nano::epoch_upgrader epoch_upgrader;
nano::block_broadcast block_broadcast;
std::chrono::steady_clock::time_point const startup_time;
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week

View file

@ -1047,11 +1047,13 @@ bool nano::wallet::action_complete (std::shared_ptr<nano::block> const & block_a
}
if (!error)
{
error = wallets.node.process_local (block_a).code != nano::process_result::progress;
auto result = wallets.node.process_local (block_a);
error = !result || result.value ().code != nano::process_result::progress;
debug_assert (error || block_a->sideband ().details == details_a);
}
if (!error && generate_work_a)
{
// Pregenerate work for next block based on the block just created
work_ensure (account_a, block_a->hash ());
}
}