diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 118dab10..67ed3355 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -1064,14 +1064,14 @@ TEST (active_transactions, conflicting_block_vote_existing_election) .build_shared (); auto vote_fork (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector{ fork->hash () })); - ASSERT_EQ (nano::process_result::progress, node.process_local (send).code); + ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code); ASSERT_TIMELY_EQ (5s, 1, node.active.size ()); // Vote for conflicting block, but the block does not yet exist in the ledger node.active.vote (vote_fork); // Block now gets processed - ASSERT_EQ (nano::process_result::fork, node.process_local (fork).code); + ASSERT_EQ (nano::process_result::fork, node.process_local (fork).value ().code); // Election must be confirmed auto election (node.active.election (fork->qualified_root ())); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 4a20b81f..7b541beb 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -656,7 +656,7 @@ TEST (node, fork_publish_inactive) std::shared_ptr election; ASSERT_TIMELY (5s, election = node.active.election (send1->qualified_root ())); - ASSERT_EQ (nano::process_result::fork, node.process_local (send2).code); + ASSERT_EQ (nano::process_result::fork, node.process_local (send2).value ().code); auto blocks = election->blocks (); ASSERT_TIMELY_EQ (5s, blocks.size (), 2); @@ -3436,7 +3436,6 @@ TEST (node, aggressive_flooding) nano::test::system system; nano::node_flags node_flags; node_flags.disable_request_loop = true; - node_flags.disable_block_processor_republishing = true; node_flags.disable_bootstrap_bulk_push_client = true; node_flags.disable_bootstrap_bulk_pull_server = true; node_flags.disable_bootstrap_listener = true; @@ -3521,7 +3520,7 @@ TEST (node, aggressive_flooding) .build (); } // Processing locally goes through the aggressive block flooding path - ASSERT_EQ (nano::process_result::progress, node1.process_local (block).code); + ASSERT_EQ (nano::process_result::progress, node1.process_local (block).value ().code); auto all_have_block = [&nodes_wallets] (nano::block_hash const & hash_a) { return std::all_of (nodes_wallets.begin (), nodes_wallets.end (), [hash = hash_a] (auto const & node_wallet) { diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 63951f76..2a2fdfaa 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -198,7 +198,7 @@ TEST (vote_processor, no_broadcast_local) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (ec); ASSERT_FALSE (ec); - ASSERT_EQ (nano::process_result::progress, node.process_local (send).code); + ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code); ASSERT_TIMELY (10s, !node.active.empty ()); ASSERT_EQ (2 * node.config.vote_minimum.number (), node.weight (nano::dev::genesis_key.pub)); // Insert account in wallet. Votes on node are not enabled. @@ -251,7 +251,7 @@ TEST (vote_processor, local_broadcast_without_a_representative) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (ec); ASSERT_FALSE (ec); - ASSERT_EQ (nano::process_result::progress, node.process_local (send).code); + ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code); ASSERT_TIMELY (10s, !node.active.empty ()); ASSERT_EQ (node.config.vote_minimum, node.weight (nano::dev::genesis_key.pub)); node.block_confirm (send); @@ -299,7 +299,7 @@ TEST (vote_processor, no_broadcast_local_with_a_principal_representative) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (ec); ASSERT_FALSE (ec); - ASSERT_EQ (nano::process_result::progress, node.process_local (send).code); + ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code); ASSERT_TIMELY (10s, !node.active.empty ()); ASSERT_EQ (nano::dev::constants.genesis_amount - 2 * node.config.vote_minimum.number (), node.weight (nano::dev::genesis_key.pub)); // Insert account in wallet. Votes on node are not enabled. diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index e91b7d53..4b57e116 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -1062,7 +1062,7 @@ TEST (websocket, new_unconfirmed_block) .work (*system.work.generate (nano::dev::genesis->hash ())) .build_shared (); - ASSERT_EQ (nano::process_result::progress, node1->process_local (send1).code); + ASSERT_EQ (nano::process_result::progress, node1->process_local (send1).value ().code); ASSERT_TIMELY (5s, future.wait_for (0s) == std::future_status::ready); diff --git a/nano/lib/errors.cpp b/nano/lib/errors.cpp index 69766d2b..f4eabbd6 100644 --- a/nano/lib/errors.cpp +++ b/nano/lib/errors.cpp @@ -223,6 +223,8 @@ std::string nano::error_rpc_messages::message (int ev) const return "Signing by block hash is disabled"; case nano::error_rpc::source_not_found: return "Source not found"; + case nano::error_rpc::stopped: + return "Stopped"; } return "Invalid error code"; @@ -503,4 +505,4 @@ std::error_code make_error_code (boost::system::errc::errc_t const & e) return std::error_code (static_cast (e), ::nano::error_conversion::generic_category ()); } } -#endif \ No newline at end of file +#endif diff --git a/nano/lib/errors.hpp b/nano/lib/errors.hpp index c6b281d3..27e8c6c5 100644 --- a/nano/lib/errors.hpp +++ b/nano/lib/errors.hpp @@ -123,7 +123,8 @@ enum class error_rpc requires_port_and_address, rpc_control_disabled, sign_hash_disabled, - source_not_found + source_not_found, + stopped }; /** process_result related errors */ diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 9f55b8dd..e07c62ba 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -22,6 +22,10 @@ add_library( bandwidth_limiter.cpp block_arrival.hpp block_arrival.cpp + block_broadcast.cpp + block_broadcast.hpp + blocking_observer.cpp + blocking_observer.hpp blockprocessor.hpp blockprocessor.cpp bootstrap/block_deserializer.hpp diff --git a/nano/node/block_broadcast.cpp b/nano/node/block_broadcast.cpp new file mode 100644 index 00000000..31dc2ef2 --- /dev/null +++ b/nano/node/block_broadcast.cpp @@ -0,0 +1,68 @@ +#include +#include +#include +#include + +nano::block_broadcast::block_broadcast (nano::network & network, nano::block_arrival & block_arrival) : + network{ network }, + block_arrival{ block_arrival } +{ +} + +void nano::block_broadcast::connect (nano::block_processor & block_processor, bool enabled) +{ + if (!enabled) + { + return; + } + block_processor.processed.add ([this] (auto const & result, auto const & block) { + switch (result.code) + { + case nano::process_result::progress: + observe (block); + break; + default: + break; + } + erase (block); + }); +} + +void nano::block_broadcast::observe (std::shared_ptr block) +{ + nano::unique_lock lock{ mutex }; + auto existing = local.find (block); + auto local_l = existing != local.end (); + lock.unlock (); + if (local_l) + { + // Block created on this node + // Perform more agressive initial flooding + network.flood_block_initial (block); + } + else + { + if (block_arrival.recent (block->hash ())) + { + // Block arrived from realtime traffic, do normal gossip. + network.flood_block (block, nano::transport::buffer_drop_policy::limiter); + } + else + { + // Block arrived from bootstrap + // Don't broadcast blocks we're bootstrapping + } + } +} + +void nano::block_broadcast::set_local (std::shared_ptr block) +{ + nano::lock_guard lock{ mutex }; + local.insert (block); +} + +void nano::block_broadcast::erase (std::shared_ptr block) +{ + nano::lock_guard lock{ mutex }; + local.erase (block); +} diff --git a/nano/node/block_broadcast.hpp b/nano/node/block_broadcast.hpp new file mode 100644 index 00000000..efdd8c8a --- /dev/null +++ b/nano/node/block_broadcast.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include + +#include +#include + +namespace nano +{ +class block_arrival; +class block_processor; +class network; +// This class tracks blocks that originated from this node. +class block_broadcast +{ +public: + block_broadcast (nano::network & network, nano::block_arrival & block_arrival); + // Add batch_processed observer to block_processor if enabled + void connect (nano::block_processor & block_processor, bool enabled); + // Block_processor observer + void observe (std::shared_ptr block); + // Mark a block as originating locally + void set_local (std::shared_ptr block); + void erase (std::shared_ptr block); + +private: + nano::network & network; + nano::block_arrival & block_arrival; + std::unordered_set> local; // Blocks originated on this node + nano::mutex mutex; +}; +} diff --git a/nano/node/blocking_observer.cpp b/nano/node/blocking_observer.cpp new file mode 100644 index 00000000..809f0b11 --- /dev/null +++ b/nano/node/blocking_observer.cpp @@ -0,0 +1,52 @@ +#include +#include + +void nano::blocking_observer::connect (nano::block_processor & block_processor) +{ + block_processor.processed.add ([this] (auto const & result, auto const & block) { + observe (result, block); + }); +} + +void nano::blocking_observer::stop () +{ + nano::unique_lock lock{ mutex }; + stopped = true; + auto discard = std::move (blocking); + // Signal broken promises outside lock + lock.unlock (); + discard.clear (); // ~promise future_error +} + +void nano::blocking_observer::observe (nano::process_return const & result, std::shared_ptr block) +{ + nano::unique_lock lock{ mutex }; + auto existing = blocking.find (block); + if (existing != blocking.end ()) + { + auto promise = std::move (existing->second); + blocking.erase (existing); + // Signal promise outside of lock + lock.unlock (); + promise.set_value (result); + } +} + +std::future nano::blocking_observer::insert (std::shared_ptr block) +{ + nano::lock_guard lock{ mutex }; + if (stopped) + { + std::promise promise; + return promise.get_future (); // ~promise future_error + } + auto iterator = blocking.emplace (block, std::promise{}); + return iterator->second.get_future (); +} + +bool nano::blocking_observer::exists (std::shared_ptr block) +{ + nano::lock_guard lock{ mutex }; + auto existing = blocking.find (block); + return existing != blocking.end (); +} diff --git a/nano/node/blocking_observer.hpp b/nano/node/blocking_observer.hpp new file mode 100644 index 00000000..f4b01b22 --- /dev/null +++ b/nano/node/blocking_observer.hpp @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + +#include +#include +#include + +namespace nano +{ +class block; +class block_processor; +// Observer that facilitates a blocking call to block processing which is done asynchronosly by the block_processor +class blocking_observer +{ +public: + void connect (nano::block_processor & block_processor); + // Stop the observer and trigger broken promise exceptions + void stop (); + // Block processor observer + void observe (nano::process_return const & result, std::shared_ptr block); + [[nodiscard]] std::future insert (std::shared_ptr block); + bool exists (std::shared_ptr block); + +private: + std::unordered_multimap, std::promise> blocking; + bool stopped{ false }; + nano::mutex mutex; +}; +} diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 071cf7b7..1fa45f72 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -31,6 +31,15 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas write_database_queue (write_database_queue_a), state_block_signature_verification (node.checker, node.ledger.constants.epochs, node.config, node.logger, node.flags.block_processor_verification_size) { + batch_processed.add ([this] (auto const & items) { + // For every batch item: notify the 'processed' observer. + for (auto const & item : items) + { + auto const & [result, block] = item; + processed.notify (result, block); + } + }); + blocking.connect (*this); state_block_signature_verification.blocks_verified_callback = [this] (std::deque & items, std::vector const & verifications, std::vector const & hashes, std::vector const & blocks_signatures) { this->process_verified_state_blocks (items, verifications, hashes, blocks_signatures); }; @@ -57,6 +66,7 @@ void nano::block_processor::stop () stopped = true; } condition.notify_all (); + blocking.stop (); state_block_signature_verification.stop (); nano::join_or_pass (processing_thread); } @@ -101,18 +111,24 @@ void nano::block_processor::add (std::shared_ptr const & block) node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); return; } - if (block->type () == nano::block_type::state || block->type () == nano::block_type::open) + add_impl (block); + return; +} + +std::optional nano::block_processor::add_blocking (std::shared_ptr const & block) +{ + auto future = blocking.insert (block); + add_impl (block); + condition.notify_all (); + std::optional result; + try { - state_block_signature_verification.add ({ block }); + result = future.get (); } - else + catch (std::future_error const & e) { - { - nano::lock_guard guard{ mutex }; - blocks.emplace_back (block); - } - condition.notify_all (); } + return result; } void nano::block_processor::force (std::shared_ptr const & block_a) @@ -124,12 +140,6 @@ void nano::block_processor::force (std::shared_ptr const & block_a) condition.notify_all (); } -void nano::block_processor::wait_write () -{ - nano::lock_guard lock{ mutex }; - awaiting_write = true; -} - void nano::block_processor::process_blocks () { nano::unique_lock lock{ mutex }; @@ -139,7 +149,8 @@ void nano::block_processor::process_blocks () { active = true; lock.unlock (); - process_batch (lock); + auto processed = process_batch (lock); + batch_processed.notify (processed); lock.lock (); active = false; } @@ -208,8 +219,25 @@ void nano::block_processor::process_verified_state_blocks (std::deque & lock_a) +void nano::block_processor::add_impl (std::shared_ptr block) { + if (block->type () == nano::block_type::state || block->type () == nano::block_type::open) + { + state_block_signature_verification.add ({ block }); + } + else + { + { + nano::lock_guard guard{ mutex }; + blocks.emplace_back (block); + } + condition.notify_all (); + } +} + +auto nano::block_processor::process_batch (nano::unique_lock & lock_a) -> std::deque +{ + std::deque processed; auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); block_post_events post_events ([&store = node.store] { return store.tx_begin_read (); }); auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending, tables::unchecked })); @@ -221,7 +249,7 @@ void nano::block_processor::process_batch (nano::unique_lock & lock auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; - while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !awaiting_write && !store_batch_reached ()) + while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) { if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ()) { @@ -278,19 +306,20 @@ void nano::block_processor::process_batch (nano::unique_lock & lock } } number_of_blocks_processed++; - process_one (transaction, post_events, block, force); + auto result = process_one (transaction, post_events, block, force); + processed.emplace_back (result, block); lock_a.lock (); } - awaiting_write = false; lock_a.unlock (); if (node.config.logging.timing_logging () && number_of_blocks_processed != 0 && timer_l.stop () > std::chrono::milliseconds (100)) { node.logger.always_log (boost::str (boost::format ("Processed %1% blocks (%2% blocks were forced) in %3% %4%") % number_of_blocks_processed % number_of_forced_processed % timer_l.value ().count () % timer_l.unit ())); } + return processed; } -void nano::block_processor::process_live (nano::transaction const & transaction_a, nano::block_hash const & hash_a, std::shared_ptr const & block_a, nano::process_return const & process_return_a, nano::block_origin const origin_a) +void nano::block_processor::process_live (nano::transaction const & transaction_a, std::shared_ptr const & block_a) { // Start collecting quorum on block if (node.ledger.dependents_confirmed (transaction_a, *block_a)) @@ -302,30 +331,17 @@ void nano::block_processor::process_live (nano::transaction const & transaction_ // Notify inactive vote cache about a new live block node.inactive_vote_cache.trigger (block_a->hash ()); - // Announce block contents to the network - if (origin_a == nano::block_origin::local) - { - node.network.flood_block_initial (block_a); - } - else if (!node.flags.disable_block_processor_republishing && node.block_arrival.recent (hash_a)) - { - node.network.flood_block (block_a, nano::transport::buffer_drop_policy::limiter); - } - if (node.websocket.server && node.websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block)) { node.websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a)); } } -nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, std::shared_ptr block, bool const forced_a, nano::block_origin const origin_a) +nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, std::shared_ptr block, bool const forced_a) { nano::process_return result; auto hash (block->hash ()); result = node.ledger.process (transaction_a, *block); - events_a.events.emplace_back ([this, result, block] (nano::transaction const & tx) { - processed.notify (tx, result, *block); - }); switch (result.code) { case nano::process_result::progress: @@ -336,8 +352,8 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction block->serialize_json (block_string, node.config.logging.single_line_record ()); node.logger.try_log (boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block_string)); } - events_a.events.emplace_back ([this, hash, block, result, origin_a] (nano::transaction const & post_event_transaction_a) { - process_live (post_event_transaction_a, hash, block, result, origin_a); + events_a.events.emplace_back ([this, block] (nano::transaction const & post_event_transaction_a) { + process_live (post_event_transaction_a, block); }); queue_unchecked (transaction_a, hash); /* For send blocks check epoch open unchecked (gap pending). diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 9f0e863c..233af597 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -1,10 +1,12 @@ #pragma once #include +#include #include #include #include +#include #include #include @@ -16,12 +18,6 @@ class transaction; class write_transaction; class write_database_queue; -enum class block_origin -{ - local, - remote -}; - class block_post_events final { public: @@ -47,27 +43,36 @@ public: bool full (); bool half_full (); void add (std::shared_ptr const &); + std::optional add_blocking (std::shared_ptr const & block); void force (std::shared_ptr const &); - void wait_write (); bool should_log (); bool have_blocks_ready (); bool have_blocks (); void process_blocks (); - nano::process_return process_one (nano::write_transaction const &, block_post_events &, std::shared_ptr block, bool const = false, nano::block_origin const = nano::block_origin::remote); std::atomic flushing{ false }; // Delay required for average network propagartion before requesting confirmation static std::chrono::milliseconds constexpr confirmation_request_delay{ 1500 }; - nano::observer_set processed; + +public: // Events + using processed_t = std::pair>; + nano::observer_set> processed; + + // The batch observer feeds the processed obsever + nano::observer_set const &> batch_processed; private: + blocking_observer blocking; + +private: + nano::process_return process_one (nano::write_transaction const &, block_post_events &, std::shared_ptr block, bool const = false); void queue_unchecked (nano::write_transaction const &, nano::hash_or_account const &); - void process_batch (nano::unique_lock &); - void process_live (nano::transaction const &, nano::block_hash const &, std::shared_ptr const &, nano::process_return const &, nano::block_origin const = nano::block_origin::remote); + std::deque process_batch (nano::unique_lock &); + void process_live (nano::transaction const &, std::shared_ptr const &); void process_verified_state_blocks (std::deque &, std::vector const &, std::vector const &, std::vector const &); + void add_impl (std::shared_ptr block); bool stopped{ false }; bool active{ false }; - bool awaiting_write{ false }; std::chrono::steady_clock::time_point next_log; std::deque> blocks; std::deque> forced; diff --git a/nano/node/epoch_upgrader.cpp b/nano/node/epoch_upgrader.cpp index ae83291d..2064cf1f 100644 --- a/nano/node/epoch_upgrader.cpp +++ b/nano/node/epoch_upgrader.cpp @@ -50,7 +50,7 @@ void nano::epoch_upgrader::upgrade_impl (nano::raw_key const & prv_a, nano::epoc nano::process_result result (nano::process_result::old); if (valid_signature && valid_work) { - result = node.process_local (epoch).code; + result = node.process_local (epoch).value ().code; } if (result == nano::process_result::progress) { diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index e09960de..20548def 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3237,87 +3237,95 @@ void nano::json_handler::process () { if (!is_async) { - auto result (rpc_l->node.process_local (block)); - switch (result.code) + auto result_maybe = rpc_l->node.process_local (block); + if (!result_maybe) { - case nano::process_result::progress: + rpc_l->ec = nano::error_rpc::stopped; + } + else + { + auto const & result = result_maybe.value (); + switch (result.code) { - rpc_l->response_l.put ("hash", block->hash ().to_string ()); - break; - } - case nano::process_result::gap_previous: - { - rpc_l->ec = nano::error_process::gap_previous; - break; - } - case nano::process_result::gap_source: - { - rpc_l->ec = nano::error_process::gap_source; - break; - } - case nano::process_result::old: - { - rpc_l->ec = nano::error_process::old; - break; - } - case nano::process_result::bad_signature: - { - rpc_l->ec = nano::error_process::bad_signature; - break; - } - case nano::process_result::negative_spend: - { - // TODO once we get RPC versioning, this should be changed to "negative spend" - rpc_l->ec = nano::error_process::negative_spend; - break; - } - case nano::process_result::balance_mismatch: - { - rpc_l->ec = nano::error_process::balance_mismatch; - break; - } - case nano::process_result::unreceivable: - { - rpc_l->ec = nano::error_process::unreceivable; - break; - } - case nano::process_result::block_position: - { - rpc_l->ec = nano::error_process::block_position; - break; - } - case nano::process_result::gap_epoch_open_pending: - { - rpc_l->ec = nano::error_process::gap_epoch_open_pending; - break; - } - case nano::process_result::fork: - { - bool const force = rpc_l->request.get ("force", false); - if (force) + case nano::process_result::progress: { - rpc_l->node.active.erase (*block); - rpc_l->node.block_processor.force (block); rpc_l->response_l.put ("hash", block->hash ().to_string ()); + break; } - else + case nano::process_result::gap_previous: { - rpc_l->ec = nano::error_process::fork; + rpc_l->ec = nano::error_process::gap_previous; + break; + } + case nano::process_result::gap_source: + { + rpc_l->ec = nano::error_process::gap_source; + break; + } + case nano::process_result::old: + { + rpc_l->ec = nano::error_process::old; + break; + } + case nano::process_result::bad_signature: + { + rpc_l->ec = nano::error_process::bad_signature; + break; + } + case nano::process_result::negative_spend: + { + // TODO once we get RPC versioning, this should be changed to "negative spend" + rpc_l->ec = nano::error_process::negative_spend; + break; + } + case nano::process_result::balance_mismatch: + { + rpc_l->ec = nano::error_process::balance_mismatch; + break; + } + case nano::process_result::unreceivable: + { + rpc_l->ec = nano::error_process::unreceivable; + break; + } + case nano::process_result::block_position: + { + rpc_l->ec = nano::error_process::block_position; + break; + } + case nano::process_result::gap_epoch_open_pending: + { + rpc_l->ec = nano::error_process::gap_epoch_open_pending; + break; + } + case nano::process_result::fork: + { + bool const force = rpc_l->request.get ("force", false); + if (force) + { + rpc_l->node.active.erase (*block); + rpc_l->node.block_processor.force (block); + rpc_l->response_l.put ("hash", block->hash ().to_string ()); + } + else + { + rpc_l->ec = nano::error_process::fork; + } + break; + } + case nano::process_result::insufficient_work: + { + rpc_l->ec = nano::error_process::insufficient_work; + break; + } + case nano::process_result::opened_burn_account: + rpc_l->ec = nano::error_process::opened_burn_account; + break; + default: + { + rpc_l->ec = nano::error_process::other; + break; } - break; - } - case nano::process_result::insufficient_work: - { - rpc_l->ec = nano::error_process::insufficient_work; - break; - } - case nano::process_result::opened_burn_account: - rpc_l->ec = nano::error_process::opened_burn_account; - break; - default: - { - rpc_l->ec = nano::error_process::other; - break; } } } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 43ad4a53..047f3b87 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -204,8 +204,10 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co websocket{ config.websocket_config, observers, wallets, ledger, io_ctx, logger }, epoch_upgrader{ *this, ledger, store, network_params, logger }, startup_time (std::chrono::steady_clock::now ()), - node_seq (seq) + node_seq (seq), + block_broadcast{ network, block_arrival } { + block_broadcast.connect (block_processor, !flags.disable_block_processor_republishing); unchecked.use_memory = [this] () { return ledger.bootstrap_weight_reached (); }; unchecked.satisfied = [this] (nano::unchecked_info const & info) { this->block_processor.add (info.block); @@ -597,16 +599,12 @@ nano::process_return nano::node::process (nano::block & block) return process (transaction, block); } -nano::process_return nano::node::process_local (std::shared_ptr const & block_a) +std::optional nano::node::process_local (std::shared_ptr const & block_a) { // Add block hash as recently arrived to trigger automatic rebroadcast and election block_arrival.add (block_a->hash ()); - // Notify block processor to release write lock - block_processor.wait_write (); - // Process block - block_post_events post_events ([&store = store] { return store.tx_begin_read (); }); - auto const transaction (store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending })); - return block_processor.process_one (transaction, post_events, block_a, false, nano::block_origin::local); + block_broadcast.set_local (block_a); + return block_processor.add_blocking (block_a); } void nano::node::process_local_async (std::shared_ptr const & block_a) diff --git a/nano/node/node.hpp b/nano/node/node.hpp index bc03fd3c..724b072d 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -84,7 +85,7 @@ public: void process_confirmed_data (nano::transaction const &, std::shared_ptr const &, nano::block_hash const &, nano::account &, nano::uint128_t &, bool &, bool &, nano::account &); void process_confirmed (nano::election_status const &, uint64_t = 0); void process_active (std::shared_ptr const &); - nano::process_return process_local (std::shared_ptr const &); + std::optional process_local (std::shared_ptr const &); void process_local_async (std::shared_ptr const &); void keepalive_preconfigured (std::vector const &); std::shared_ptr block (nano::block_hash const &); @@ -190,6 +191,7 @@ public: nano::backlog_population backlog; nano::websocket_server websocket; nano::epoch_upgrader epoch_upgrader; + nano::block_broadcast block_broadcast; std::chrono::steady_clock::time_point const startup_time; std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index 4abd8e09..f48c841d 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -1047,11 +1047,13 @@ bool nano::wallet::action_complete (std::shared_ptr const & block_a } if (!error) { - error = wallets.node.process_local (block_a).code != nano::process_result::progress; + auto result = wallets.node.process_local (block_a); + error = !result || result.value ().code != nano::process_result::progress; debug_assert (error || block_a->sideband ().details == details_a); } if (!error && generate_work_a) { + // Pregenerate work for next block based on the block just created work_ensure (account_a, block_a->hash ()); } }