diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 299d9a48..977ff0b4 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -17,6 +17,8 @@ endif () add_library (node ${platform_sources} ${secure_rpc_sources} + blockprocessor.cpp + blockprocessor.hpp bootstrap.cpp bootstrap.hpp cli.hpp diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp new file mode 100644 index 00000000..4bd5379f --- /dev/null +++ b/nano/node/blockprocessor.cpp @@ -0,0 +1,511 @@ +#include +#include +#include +#include +#include + +nano::block_processor::block_processor (nano::node & node_a) : +generator (node_a, nano::is_test_network ? std::chrono::milliseconds (10) : std::chrono::milliseconds (500)), +stopped (false), +active (false), +next_log (std::chrono::steady_clock::now ()), +node (node_a) +{ +} + +nano::block_processor::~block_processor () +{ + stop (); +} + +void nano::block_processor::stop () +{ + generator.stop (); + { + std::lock_guard lock (mutex); + stopped = true; + } + condition.notify_all (); +} + +void nano::block_processor::flush () +{ + node.checker.flush (); + std::unique_lock lock (mutex); + while (!stopped && (have_blocks () || active)) + { + condition.wait (lock); + } +} + +bool nano::block_processor::full () +{ + size_t full_size (node.flags.fast_bootstrap ? 1024 * 1024 : 65536); + std::unique_lock lock (mutex); + return (blocks.size () + state_blocks.size ()) > full_size; +} + +void nano::block_processor::add (std::shared_ptr block_a, uint64_t origination) +{ + nano::unchecked_info info (block_a, 0, origination, nano::signature_verification::unknown); + add (info); +} + +void nano::block_processor::add (nano::unchecked_info const & info_a) +{ + if (!nano::work_validate (info_a.block->root (), info_a.block->block_work ())) + { + { + auto hash (info_a.block->hash ()); + std::lock_guard lock (mutex); + if (blocks_hashes.find (hash) == blocks_hashes.end () && rolled_back.get<1> ().find (hash) == rolled_back.get<1> ().end ()) + { + if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ())) + { + state_blocks.push_back (info_a); + } + else + { + blocks.push_back (info_a); + } + blocks_hashes.insert (hash); + } + condition.notify_all (); + } + } + else + { + BOOST_LOG (node.log) << "nano::block_processor::add called for hash " << info_a.block->hash ().to_string () << " with invalid work " << nano::to_string_hex (info_a.block->block_work ()); + assert (false && "nano::block_processor::add called with invalid work"); + } +} + +void nano::block_processor::force (std::shared_ptr block_a) +{ + { + std::lock_guard lock (mutex); + forced.push_back (block_a); + } + condition.notify_all (); +} + +void nano::block_processor::process_blocks () +{ + std::unique_lock lock (mutex); + while (!stopped) + { + if (have_blocks ()) + { + active = true; + lock.unlock (); + process_batch (lock); + lock.lock (); + active = false; + } + else + { + lock.unlock (); + condition.notify_all (); + lock.lock (); + + condition.wait (lock); + } + } +} + +bool nano::block_processor::should_log (bool first_time) +{ + auto result (false); + auto now (std::chrono::steady_clock::now ()); + if (first_time || next_log < now) + { + next_log = now + std::chrono::seconds (15); + result = true; + } + return result; +} + +bool nano::block_processor::have_blocks () +{ + assert (!mutex.try_lock ()); + return !blocks.empty () || !forced.empty () || !state_blocks.empty (); +} + +void nano::block_processor::verify_state_blocks (nano::transaction const & transaction_a, std::unique_lock & lock_a, size_t max_count) +{ + assert (!mutex.try_lock ()); + nano::timer timer_l (nano::timer_state::started); + std::deque items; + for (auto i (0); i < max_count && !state_blocks.empty (); i++) + { + auto item (state_blocks.front ()); + state_blocks.pop_front (); + if (!node.ledger.store.block_exists (transaction_a, item.block->type (), item.block->hash ())) + { + items.push_back (item); + } + } + lock_a.unlock (); + if (!items.empty ()) + { + auto size (items.size ()); + std::vector hashes; + hashes.reserve (size); + std::vector messages; + messages.reserve (size); + std::vector lengths; + lengths.reserve (size); + std::vector accounts; + accounts.reserve (size); + std::vector pub_keys; + pub_keys.reserve (size); + std::vector blocks_signatures; + blocks_signatures.reserve (size); + std::vector signatures; + signatures.reserve (size); + std::vector verifications; + verifications.resize (size, 0); + for (auto i (0); i < size; ++i) + { + auto item (items[i]); + hashes.push_back (item.block->hash ()); + messages.push_back (hashes.back ().bytes.data ()); + lengths.push_back (sizeof (decltype (hashes)::value_type)); + nano::account account (item.block->account ()); + if (!item.block->link ().is_zero () && node.ledger.is_epoch_link (item.block->link ())) + { + account = node.ledger.epoch_signer; + } + else if (!item.account.is_zero ()) + { + account = item.account; + } + accounts.push_back (account); + pub_keys.push_back (accounts.back ().bytes.data ()); + blocks_signatures.push_back (item.block->block_signature ()); + signatures.push_back (blocks_signatures.back ().bytes.data ()); + } + nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; + node.checker.verify (check); + lock_a.lock (); + for (auto i (0); i < size; ++i) + { + assert (verifications[i] == 1 || verifications[i] == 0); + auto item (items.front ()); + if (!item.block->link ().is_zero () && node.ledger.is_epoch_link (item.block->link ())) + { + // Epoch blocks + if (verifications[i] == 1) + { + item.verified = nano::signature_verification::valid_epoch; + blocks.push_back (item); + } + else + { + // Possible regular state blocks with epoch link (send subtype) + item.verified = nano::signature_verification::unknown; + blocks.push_back (item); + } + } + else if (verifications[i] == 1) + { + // Non epoch blocks + item.verified = nano::signature_verification::valid; + blocks.push_back (item); + } + items.pop_front (); + } + if (node.config.logging.timing_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Batch verified %1% state blocks in %2% %3%") % size % timer_l.stop ().count () % timer_l.unit ()); + } + } + else + { + lock_a.lock (); + } +} + +void nano::block_processor::process_batch (std::unique_lock & lock_a) +{ + nano::timer timer_l; + lock_a.lock (); + timer_l.start (); + // Limit state blocks verification time + size_t max_verification_batch (node.flags.fast_bootstrap ? std::numeric_limits::max () : 2048 * (node.config.signature_checker_threads + 1)); + if (!state_blocks.empty ()) + { + auto transaction (node.store.tx_begin_read ()); + while (!state_blocks.empty () && timer_l.before_deadline (std::chrono::seconds (2))) + { + verify_state_blocks (transaction, lock_a, max_verification_batch); + } + } + lock_a.unlock (); + auto transaction (node.store.tx_begin_write ()); + timer_l.restart (); + lock_a.lock (); + // Processing blocks + auto first_time (true); + unsigned number_of_blocks_processed (0), number_of_forced_processed (0); + while ((!blocks.empty () || !forced.empty ()) && (timer_l.before_deadline (node.config.block_processor_batch_max_time) || (node.flags.fast_bootstrap && number_of_blocks_processed < 256 * 1024))) + { + auto log_this_record (false); + if (node.config.logging.timing_logging ()) + { + if (should_log (first_time)) + { + log_this_record = true; + } + } + else + { + if (((blocks.size () + state_blocks.size () + forced.size ()) > 64 && should_log (false))) + { + log_this_record = true; + } + } + + if (log_this_record) + { + first_time = false; + BOOST_LOG (node.log) << boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_blocks.size () % forced.size ()); + } + nano::unchecked_info info; + bool force (false); + if (forced.empty ()) + { + info = blocks.front (); + blocks.pop_front (); + blocks_hashes.erase (info.block->hash ()); + } + else + { + info = nano::unchecked_info (forced.front (), 0, nano::seconds_since_epoch (), nano::signature_verification::unknown); + forced.pop_front (); + force = true; + number_of_forced_processed++; + } + lock_a.unlock (); + auto hash (info.block->hash ()); + if (force) + { + auto successor (node.ledger.successor (transaction, nano::uint512_union (info.block->previous (), info.block->root ()))); + if (successor != nullptr && successor->hash () != hash) + { + // Replace our block with the winner and roll back any dependent blocks + BOOST_LOG (node.log) << boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ()); + std::vector rollback_list; + node.ledger.rollback (transaction, successor->hash (), rollback_list); + BOOST_LOG (node.log) << boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ()); + lock_a.lock (); + // Prevent rolled back blocks second insertion + auto inserted (rolled_back.insert (nano::rolled_hash{ std::chrono::steady_clock::now (), successor->hash () })); + if (inserted.second) + { + // Possible election winner change + rolled_back.get<1> ().erase (hash); + // Prevent overflow + if (rolled_back.size () > rolled_back_max) + { + rolled_back.erase (rolled_back.begin ()); + } + } + lock_a.unlock (); + // Deleting from votes cache + for (auto & i : rollback_list) + { + node.votes_cache.remove (i); + } + } + } + number_of_blocks_processed++; + auto process_result (process_one (transaction, info)); + (void)process_result; + lock_a.lock (); + /* Verify more state blocks if blocks deque is empty + Because verification is long process, avoid large deque verification inside of write transaction */ + if (blocks.empty () && !state_blocks.empty ()) + { + verify_state_blocks (transaction, lock_a, 256 * (node.config.signature_checker_threads + 1)); + } + } + lock_a.unlock (); + + if (node.config.logging.timing_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Processed %1% blocks (%2% blocks were forced) in %3% %4%") % number_of_blocks_processed % number_of_forced_processed % timer_l.stop ().count () % timer_l.unit ()); + } +} + +void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr block_a) +{ + // Start collecting quorum on block + node.active.start (block_a); + // Announce block contents to the network + node.network.republish_block (block_a); + if (node.config.enable_voting) + { + // Announce our weighted vote to the network + generator.add (hash_a); + } +} + +nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, nano::unchecked_info info_a) +{ + nano::process_return result; + auto hash (info_a.block->hash ()); + result = node.ledger.process (transaction_a, *(info_a.block), info_a.verified); + switch (result.code) + { + case nano::process_result::progress: + { + release_assert (info_a.account.is_zero () || info_a.account == result.account); + if (node.config.logging.ledger_logging ()) + { + std::string block; + info_a.block->serialize_json (block); + BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block); + } + if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash)) + { + process_live (hash, info_a.block); + } + queue_unchecked (transaction_a, hash); + break; + } + case nano::process_result::gap_previous: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % hash.to_string ()); + } + info_a.verified = result.verified; + if (info_a.modified == 0) + { + info_a.modified = nano::seconds_since_epoch (); + } + node.store.unchecked_put (transaction_a, nano::unchecked_key (info_a.block->previous (), hash), info_a); + node.gap_cache.add (transaction_a, hash); + break; + } + case nano::process_result::gap_source: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % hash.to_string ()); + } + info_a.verified = result.verified; + if (info_a.modified == 0) + { + info_a.modified = nano::seconds_since_epoch (); + } + node.store.unchecked_put (transaction_a, nano::unchecked_key (node.ledger.block_source (transaction_a, *(info_a.block)), hash), info_a); + node.gap_cache.add (transaction_a, hash); + break; + } + case nano::process_result::old: + { + if (node.config.logging.ledger_duplicate_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Old for: %1%") % hash.to_string ()); + } + if (!node.flags.fast_bootstrap) + { + queue_unchecked (transaction_a, hash); + } + node.active.update_difficulty (*(info_a.block)); + break; + } + case nano::process_result::bad_signature: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % hash.to_string ()); + } + break; + } + case nano::process_result::negative_spend: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % hash.to_string ()); + } + break; + } + case nano::process_result::unreceivable: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % hash.to_string ()); + } + break; + } + case nano::process_result::fork: + { + if (info_a.modified < nano::seconds_since_epoch () - 15) + { + // Only let the bootstrap attempt know about forked blocks that not originate recently. + node.process_fork (transaction_a, info_a.block); + node.stats.inc (nano::stat::type::ledger, nano::stat::detail::fork, nano::stat::dir::in); + } + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % hash.to_string () % info_a.block->root ().to_string ()); + } + break; + } + case nano::process_result::opened_burn_account: + { + BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % hash.to_string ()); + break; + } + case nano::process_result::balance_mismatch: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % hash.to_string ()); + } + break; + } + case nano::process_result::representative_mismatch: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Representative mismatch for: %1%") % hash.to_string ()); + } + break; + } + case nano::process_result::block_position: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % hash.to_string () % info_a.block->previous ().to_string ()); + } + break; + } + } + return result; +} + +nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, std::shared_ptr block_a) +{ + nano::unchecked_info info (block_a, block_a->account (), 0, nano::signature_verification::unknown); + auto result (process_one (transaction_a, info)); + return result; +} + +void nano::block_processor::queue_unchecked (nano::transaction const & transaction_a, nano::block_hash const & hash_a) +{ + auto unchecked_blocks (node.store.unchecked_get (transaction_a, hash_a)); + for (auto & info : unchecked_blocks) + { + if (!node.flags.fast_bootstrap) + { + node.store.unchecked_del (transaction_a, nano::unchecked_key (hash_a, info.block->hash ())); + } + add (info); + } + std::lock_guard lock (node.gap_cache.mutex); + node.gap_cache.blocks.get<1> ().erase (hash_a); +} diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp new file mode 100644 index 00000000..c498b845 --- /dev/null +++ b/nano/node/blockprocessor.hpp @@ -0,0 +1,73 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace nano +{ +class node; +class transaction; + +class rolled_hash +{ +public: + std::chrono::steady_clock::time_point time; + nano::block_hash hash; +}; +/** + * Processing blocks is a potentially long IO operation. + * This class isolates block insertion from other operations like servicing network operations + */ +class block_processor +{ +public: + block_processor (nano::node &); + ~block_processor (); + void stop (); + void flush (); + bool full (); + void add (nano::unchecked_info const &); + void add (std::shared_ptr, uint64_t = 0); + void force (std::shared_ptr); + bool should_log (bool); + bool have_blocks (); + void process_blocks (); + nano::process_return process_one (nano::transaction const &, nano::unchecked_info); + nano::process_return process_one (nano::transaction const &, std::shared_ptr); + nano::vote_generator generator; + +private: + void queue_unchecked (nano::transaction const &, nano::block_hash const &); + void verify_state_blocks (nano::transaction const & transaction_a, std::unique_lock &, size_t = std::numeric_limits::max ()); + void process_batch (std::unique_lock &); + void process_live (nano::block_hash const &, std::shared_ptr); + bool stopped; + bool active; + std::chrono::steady_clock::time_point next_log; + std::deque state_blocks; + std::deque blocks; + std::unordered_set blocks_hashes; + std::deque> forced; + boost::multi_index_container< + nano::rolled_hash, + boost::multi_index::indexed_by< + boost::multi_index::ordered_non_unique>, + boost::multi_index::hashed_unique>>> + rolled_back; + static size_t const rolled_back_max = 1024; + std::condition_variable condition; + nano::node & node; + std::mutex mutex; + + friend std::unique_ptr collect_seq_con_info (block_processor & block_processor, const std::string & name); +}; +} diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 4c4d2a0d..69e94feb 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1526,512 +1526,6 @@ void nano::signature_checker::set_thread_names (unsigned num_threads) assert (pending == 0); } -nano::block_processor::block_processor (nano::node & node_a) : -generator (node_a, nano::is_test_network ? std::chrono::milliseconds (10) : std::chrono::milliseconds (500)), -stopped (false), -active (false), -next_log (std::chrono::steady_clock::now ()), -node (node_a) -{ -} - -nano::block_processor::~block_processor () -{ - stop (); -} - -void nano::block_processor::stop () -{ - generator.stop (); - { - std::lock_guard lock (mutex); - stopped = true; - } - condition.notify_all (); -} - -void nano::block_processor::flush () -{ - node.checker.flush (); - std::unique_lock lock (mutex); - while (!stopped && (have_blocks () || active)) - { - condition.wait (lock); - } -} - -bool nano::block_processor::full () -{ - size_t full_size (node.flags.fast_bootstrap ? 1024 * 1024 : 65536); - std::unique_lock lock (mutex); - return (blocks.size () + state_blocks.size ()) > full_size; -} - -void nano::block_processor::add (std::shared_ptr block_a, uint64_t origination) -{ - nano::unchecked_info info (block_a, 0, origination, nano::signature_verification::unknown); - add (info); -} - -void nano::block_processor::add (nano::unchecked_info const & info_a) -{ - if (!nano::work_validate (info_a.block->root (), info_a.block->block_work ())) - { - { - auto hash (info_a.block->hash ()); - std::lock_guard lock (mutex); - if (blocks_hashes.find (hash) == blocks_hashes.end () && rolled_back.get<1> ().find (hash) == rolled_back.get<1> ().end ()) - { - if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ())) - { - state_blocks.push_back (info_a); - } - else - { - blocks.push_back (info_a); - } - blocks_hashes.insert (hash); - } - condition.notify_all (); - } - } - else - { - BOOST_LOG (node.log) << "nano::block_processor::add called for hash " << info_a.block->hash ().to_string () << " with invalid work " << nano::to_string_hex (info_a.block->block_work ()); - assert (false && "nano::block_processor::add called with invalid work"); - } -} - -void nano::block_processor::force (std::shared_ptr block_a) -{ - { - std::lock_guard lock (mutex); - forced.push_back (block_a); - } - condition.notify_all (); -} - -void nano::block_processor::process_blocks () -{ - std::unique_lock lock (mutex); - while (!stopped) - { - if (have_blocks ()) - { - active = true; - lock.unlock (); - process_batch (lock); - lock.lock (); - active = false; - } - else - { - lock.unlock (); - condition.notify_all (); - lock.lock (); - - condition.wait (lock); - } - } -} - -bool nano::block_processor::should_log (bool first_time) -{ - auto result (false); - auto now (std::chrono::steady_clock::now ()); - if (first_time || next_log < now) - { - next_log = now + std::chrono::seconds (15); - result = true; - } - return result; -} - -bool nano::block_processor::have_blocks () -{ - assert (!mutex.try_lock ()); - return !blocks.empty () || !forced.empty () || !state_blocks.empty (); -} - -void nano::block_processor::verify_state_blocks (nano::transaction const & transaction_a, std::unique_lock & lock_a, size_t max_count) -{ - assert (!mutex.try_lock ()); - nano::timer timer_l (nano::timer_state::started); - std::deque items; - for (auto i (0); i < max_count && !state_blocks.empty (); i++) - { - auto item (state_blocks.front ()); - state_blocks.pop_front (); - if (!node.ledger.store.block_exists (transaction_a, item.block->type (), item.block->hash ())) - { - items.push_back (item); - } - } - lock_a.unlock (); - if (!items.empty ()) - { - auto size (items.size ()); - std::vector hashes; - hashes.reserve (size); - std::vector messages; - messages.reserve (size); - std::vector lengths; - lengths.reserve (size); - std::vector accounts; - accounts.reserve (size); - std::vector pub_keys; - pub_keys.reserve (size); - std::vector blocks_signatures; - blocks_signatures.reserve (size); - std::vector signatures; - signatures.reserve (size); - std::vector verifications; - verifications.resize (size, 0); - for (auto i (0); i < size; ++i) - { - auto item (items[i]); - hashes.push_back (item.block->hash ()); - messages.push_back (hashes.back ().bytes.data ()); - lengths.push_back (sizeof (decltype (hashes)::value_type)); - nano::account account (item.block->account ()); - if (!item.block->link ().is_zero () && node.ledger.is_epoch_link (item.block->link ())) - { - account = node.ledger.epoch_signer; - } - else if (!item.account.is_zero ()) - { - account = item.account; - } - accounts.push_back (account); - pub_keys.push_back (accounts.back ().bytes.data ()); - blocks_signatures.push_back (item.block->block_signature ()); - signatures.push_back (blocks_signatures.back ().bytes.data ()); - } - nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; - node.checker.verify (check); - lock_a.lock (); - for (auto i (0); i < size; ++i) - { - assert (verifications[i] == 1 || verifications[i] == 0); - auto item (items.front ()); - if (!item.block->link ().is_zero () && node.ledger.is_epoch_link (item.block->link ())) - { - // Epoch blocks - if (verifications[i] == 1) - { - item.verified = nano::signature_verification::valid_epoch; - blocks.push_back (item); - } - else - { - // Possible regular state blocks with epoch link (send subtype) - item.verified = nano::signature_verification::unknown; - blocks.push_back (item); - } - } - else if (verifications[i] == 1) - { - // Non epoch blocks - item.verified = nano::signature_verification::valid; - blocks.push_back (item); - } - items.pop_front (); - } - if (node.config.logging.timing_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Batch verified %1% state blocks in %2% %3%") % size % timer_l.stop ().count () % timer_l.unit ()); - } - } - else - { - lock_a.lock (); - } -} - -void nano::block_processor::process_batch (std::unique_lock & lock_a) -{ - nano::timer timer_l; - lock_a.lock (); - timer_l.start (); - // Limit state blocks verification time - size_t max_verification_batch (node.flags.fast_bootstrap ? std::numeric_limits::max () : 2048 * (node.config.signature_checker_threads + 1)); - if (!state_blocks.empty ()) - { - auto transaction (node.store.tx_begin_read ()); - while (!state_blocks.empty () && timer_l.before_deadline (std::chrono::seconds (2))) - { - verify_state_blocks (transaction, lock_a, max_verification_batch); - } - } - lock_a.unlock (); - auto transaction (node.store.tx_begin_write ()); - timer_l.restart (); - lock_a.lock (); - // Processing blocks - auto first_time (true); - unsigned number_of_blocks_processed (0), number_of_forced_processed (0); - while ((!blocks.empty () || !forced.empty ()) && (timer_l.before_deadline (node.config.block_processor_batch_max_time) || (node.flags.fast_bootstrap && number_of_blocks_processed < 256 * 1024))) - { - auto log_this_record (false); - if (node.config.logging.timing_logging ()) - { - if (should_log (first_time)) - { - log_this_record = true; - } - } - else - { - if (((blocks.size () + state_blocks.size () + forced.size ()) > 64 && should_log (false))) - { - log_this_record = true; - } - } - - if (log_this_record) - { - first_time = false; - BOOST_LOG (node.log) << boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_blocks.size () % forced.size ()); - } - nano::unchecked_info info; - bool force (false); - if (forced.empty ()) - { - info = blocks.front (); - blocks.pop_front (); - blocks_hashes.erase (info.block->hash ()); - } - else - { - info = nano::unchecked_info (forced.front (), 0, nano::seconds_since_epoch (), nano::signature_verification::unknown); - forced.pop_front (); - force = true; - number_of_forced_processed++; - } - lock_a.unlock (); - auto hash (info.block->hash ()); - if (force) - { - auto successor (node.ledger.successor (transaction, nano::uint512_union (info.block->previous (), info.block->root ()))); - if (successor != nullptr && successor->hash () != hash) - { - // Replace our block with the winner and roll back any dependent blocks - BOOST_LOG (node.log) << boost::str (boost::format ("Rolling back %1% and replacing with %2%") % successor->hash ().to_string () % hash.to_string ()); - std::vector rollback_list; - node.ledger.rollback (transaction, successor->hash (), rollback_list); - BOOST_LOG (node.log) << boost::str (boost::format ("%1% blocks rolled back") % rollback_list.size ()); - lock_a.lock (); - // Prevent rolled back blocks second insertion - auto inserted (rolled_back.insert (nano::rolled_hash{ std::chrono::steady_clock::now (), successor->hash () })); - if (inserted.second) - { - // Possible election winner change - rolled_back.get<1> ().erase (hash); - // Prevent overflow - if (rolled_back.size () > rolled_back_max) - { - rolled_back.erase (rolled_back.begin ()); - } - } - lock_a.unlock (); - // Deleting from votes cache - for (auto & i : rollback_list) - { - node.votes_cache.remove (i); - } - } - } - number_of_blocks_processed++; - auto process_result (process_one (transaction, info)); - (void)process_result; - lock_a.lock (); - /* Verify more state blocks if blocks deque is empty - Because verification is long process, avoid large deque verification inside of write transaction */ - if (blocks.empty () && !state_blocks.empty ()) - { - verify_state_blocks (transaction, lock_a, 256 * (node.config.signature_checker_threads + 1)); - } - } - lock_a.unlock (); - - if (node.config.logging.timing_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Processed %1% blocks (%2% blocks were forced) in %3% %4%") % number_of_blocks_processed % number_of_forced_processed % timer_l.stop ().count () % timer_l.unit ()); - } -} - -void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr block_a) -{ - // Start collecting quorum on block - node.active.start (block_a); - // Announce block contents to the network - node.network.republish_block (block_a); - if (node.config.enable_voting) - { - // Announce our weighted vote to the network - generator.add (hash_a); - } -} - -nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, nano::unchecked_info info_a) -{ - nano::process_return result; - auto hash (info_a.block->hash ()); - result = node.ledger.process (transaction_a, *(info_a.block), info_a.verified); - switch (result.code) - { - case nano::process_result::progress: - { - release_assert (info_a.account.is_zero () || info_a.account == result.account); - if (node.config.logging.ledger_logging ()) - { - std::string block; - info_a.block->serialize_json (block); - BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block); - } - if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash)) - { - process_live (hash, info_a.block); - } - queue_unchecked (transaction_a, hash); - break; - } - case nano::process_result::gap_previous: - { - if (node.config.logging.ledger_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % hash.to_string ()); - } - info_a.verified = result.verified; - if (info_a.modified == 0) - { - info_a.modified = nano::seconds_since_epoch (); - } - node.store.unchecked_put (transaction_a, nano::unchecked_key (info_a.block->previous (), hash), info_a); - node.gap_cache.add (transaction_a, hash); - break; - } - case nano::process_result::gap_source: - { - if (node.config.logging.ledger_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % hash.to_string ()); - } - info_a.verified = result.verified; - if (info_a.modified == 0) - { - info_a.modified = nano::seconds_since_epoch (); - } - node.store.unchecked_put (transaction_a, nano::unchecked_key (node.ledger.block_source (transaction_a, *(info_a.block)), hash), info_a); - node.gap_cache.add (transaction_a, hash); - break; - } - case nano::process_result::old: - { - if (node.config.logging.ledger_duplicate_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Old for: %1%") % hash.to_string ()); - } - if (!node.flags.fast_bootstrap) - { - queue_unchecked (transaction_a, hash); - } - node.active.update_difficulty (*(info_a.block)); - break; - } - case nano::process_result::bad_signature: - { - if (node.config.logging.ledger_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % hash.to_string ()); - } - break; - } - case nano::process_result::negative_spend: - { - if (node.config.logging.ledger_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Negative spend for: %1%") % hash.to_string ()); - } - break; - } - case nano::process_result::unreceivable: - { - if (node.config.logging.ledger_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % hash.to_string ()); - } - break; - } - case nano::process_result::fork: - { - if (info_a.modified < nano::seconds_since_epoch () - 15) - { - // Only let the bootstrap attempt know about forked blocks that not originate recently. - node.process_fork (transaction_a, info_a.block); - node.stats.inc (nano::stat::type::ledger, nano::stat::detail::fork, nano::stat::dir::in); - } - if (node.config.logging.ledger_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % hash.to_string () % info_a.block->root ().to_string ()); - } - break; - } - case nano::process_result::opened_burn_account: - { - BOOST_LOG (node.log) << boost::str (boost::format ("*** Rejecting open block for burn account ***: %1%") % hash.to_string ()); - break; - } - case nano::process_result::balance_mismatch: - { - if (node.config.logging.ledger_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Balance mismatch for: %1%") % hash.to_string ()); - } - break; - } - case nano::process_result::representative_mismatch: - { - if (node.config.logging.ledger_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Representative mismatch for: %1%") % hash.to_string ()); - } - break; - } - case nano::process_result::block_position: - { - if (node.config.logging.ledger_logging ()) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% cannot follow predecessor %2%") % hash.to_string () % info_a.block->previous ().to_string ()); - } - break; - } - } - return result; -} - -nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, std::shared_ptr block_a) -{ - nano::unchecked_info info (block_a, block_a->account (), 0, nano::signature_verification::unknown); - auto result (process_one (transaction_a, info)); - return result; -} - -void nano::block_processor::queue_unchecked (nano::transaction const & transaction_a, nano::block_hash const & hash_a) -{ - auto unchecked_blocks (node.store.unchecked_get (transaction_a, hash_a)); - for (auto & info : unchecked_blocks) - { - if (!node.flags.fast_bootstrap) - { - node.store.unchecked_del (transaction_a, nano::unchecked_key (hash_a, info.block->hash ())); - } - add (info); - } - std::lock_guard lock (node.gap_cache.mutex); - node.gap_cache.blocks.get<1> ().erase (hash_a); -} - namespace nano { std::unique_ptr collect_seq_con_info (block_processor & block_processor, const std::string & name) diff --git a/nano/node/node.hpp b/nano/node/node.hpp index b2c446d6..2eeba529 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -1,18 +1,19 @@ #pragma once #include +#include #include #include #include #include #include #include -#include #include #include #include #include +#include #include #include @@ -423,7 +424,6 @@ public: std::unique_ptr collect_seq_con_info (rep_crawler & rep_crawler, const std::string & name); -class block_processor; class signature_check_set final { public: @@ -476,58 +476,6 @@ private: bool stopped{ false }; }; -class rolled_hash -{ -public: - std::chrono::steady_clock::time_point time; - nano::block_hash hash; -}; -// Processing blocks is a potentially long IO operation -// This class isolates block insertion from other operations like servicing network operations -class block_processor -{ -public: - block_processor (nano::node &); - ~block_processor (); - void stop (); - void flush (); - bool full (); - void add (nano::unchecked_info const &); - void add (std::shared_ptr, uint64_t = 0); - void force (std::shared_ptr); - bool should_log (bool); - bool have_blocks (); - void process_blocks (); - nano::process_return process_one (nano::transaction const &, nano::unchecked_info); - nano::process_return process_one (nano::transaction const &, std::shared_ptr); - nano::vote_generator generator; - -private: - void queue_unchecked (nano::transaction const &, nano::block_hash const &); - void verify_state_blocks (nano::transaction const & transaction_a, std::unique_lock &, size_t = std::numeric_limits::max ()); - void process_batch (std::unique_lock &); - void process_live (nano::block_hash const &, std::shared_ptr); - bool stopped; - bool active; - std::chrono::steady_clock::time_point next_log; - std::deque state_blocks; - std::deque blocks; - std::unordered_set blocks_hashes; - std::deque> forced; - boost::multi_index_container< - nano::rolled_hash, - boost::multi_index::indexed_by< - boost::multi_index::ordered_non_unique>, - boost::multi_index::hashed_unique>>> - rolled_back; - static size_t const rolled_back_max = 1024; - std::condition_variable condition; - nano::node & node; - std::mutex mutex; - - friend std::unique_ptr collect_seq_con_info (block_processor & block_processor, const std::string & name); -}; - std::unique_ptr collect_seq_con_info (block_processor & block_processor, const std::string & name); class node : public std::enable_shared_from_this