diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index 84ce4284..7a5482ab 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -75,6 +75,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::request_aggregator: thread_role_name_string = "Req aggregator"; break; + case nano::thread_role::name::state_block_signature_verification: + thread_role_name_string = "State block sig"; + break; } /* diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index 7c4f0116..2ea51380 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -33,7 +33,8 @@ namespace thread_role work_watcher, confirmation_height_processing, worker, - request_aggregator + request_aggregator, + state_block_signature_verification }; /* * Get/Set the identifier for the current thread diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index e8b15a6e..b5c03a41 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -121,6 +121,12 @@ add_library (node repcrawler.cpp request_aggregator.hpp request_aggregator.cpp + signatures.hpp + signatures.cpp + socket.hpp + socket.cpp + state_block_signature_verification.hpp + state_block_signature_verification.cpp testing.hpp testing.cpp transport/tcp.hpp @@ -129,10 +135,6 @@ add_library (node transport/transport.cpp transport/udp.hpp transport/udp.cpp - signatures.hpp - signatures.cpp - socket.hpp - socket.cpp vote_processor.hpp vote_processor.cpp voting.hpp diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 1345acd9..af31c8cb 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -14,8 +15,12 @@ stopped (false), active (false), next_log (std::chrono::steady_clock::now ()), node (node_a), -write_database_queue (write_database_queue_a) +write_database_queue (write_database_queue_a), +state_block_signature_verification (node.checker, node.ledger.network_params.ledger.epochs, node.config, node.logger, node.flags.block_processor_verification_size) { + state_block_signature_verification.blocks_verified_callback = [this](std::deque & items, std::vector const & verifications, std::vector const & hashes, std::vector const & blocks_signatures) { + this->process_verified_state_blocks (items, verifications, hashes, blocks_signatures); + }; } nano::block_processor::~block_processor () @@ -31,6 +36,7 @@ void nano::block_processor::stop () stopped = true; } condition.notify_all (); + state_block_signature_verification.stop (); } void nano::block_processor::flush () @@ -38,7 +44,7 @@ void nano::block_processor::flush () node.checker.flush (); flushing = true; nano::unique_lock lock (mutex); - while (!stopped && (have_blocks () || active)) + while (!stopped && (have_blocks () || active || state_block_signature_verification.is_active ())) { condition.wait (lock); } @@ -48,7 +54,7 @@ void nano::block_processor::flush () size_t nano::block_processor::size () { nano::unique_lock lock (mutex); - return (blocks.size () + state_blocks.size () + forced.size ()); + return (blocks.size () + state_block_signature_verification.size () + forced.size ()); } bool nano::block_processor::full () @@ -69,19 +75,23 @@ void nano::block_processor::add (std::shared_ptr block_a, uint64_t void nano::block_processor::add (nano::unchecked_info const & info_a) { - debug_assert (!nano::work_validate (*info_a.block)); + bool should_notify{ false }; { nano::lock_guard lock (mutex); if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ())) { - state_blocks.push_back (info_a); + state_block_signature_verification.add (info_a); } else { + should_notify = true; blocks.push_back (info_a); } } - condition.notify_all (); + if (should_notify) + { + condition.notify_all (); + } } void nano::block_processor::force (std::shared_ptr block_a) @@ -104,7 +114,7 @@ void nano::block_processor::process_blocks () nano::unique_lock lock (mutex); while (!stopped) { - if (have_blocks ()) + if (!blocks.empty () || !forced.empty ()) { active = true; lock.unlock (); @@ -114,7 +124,7 @@ void nano::block_processor::process_blocks () } else { - condition.notify_all (); + condition.notify_one (); condition.wait (lock); } } @@ -135,71 +145,14 @@ bool nano::block_processor::should_log () bool nano::block_processor::have_blocks () { debug_assert (!mutex.try_lock ()); - return !blocks.empty () || !forced.empty () || !state_blocks.empty (); + return !blocks.empty () || !forced.empty () || state_block_signature_verification.size () != 0; } -void nano::block_processor::verify_state_blocks (nano::unique_lock & lock_a, size_t max_count) +void nano::block_processor::process_verified_state_blocks (std::deque & items, std::vector const & verifications, std::vector const & hashes, std::vector const & blocks_signatures) { - debug_assert (!mutex.try_lock ()); - nano::timer timer_l (nano::timer_state::started); - std::deque items; - if (state_blocks.size () <= max_count) { - items.swap (state_blocks); - } - else - { - for (auto i (0); i < max_count; ++i) - { - items.push_back (state_blocks.front ()); - state_blocks.pop_front (); - } - debug_assert (!state_blocks.empty ()); - } - lock_a.unlock (); - if (!items.empty ()) - { - auto size (items.size ()); - std::vector hashes; - hashes.reserve (size); - std::vector messages; - messages.reserve (size); - std::vector lengths; - lengths.reserve (size); - std::vector accounts; - accounts.reserve (size); - std::vector pub_keys; - pub_keys.reserve (size); - std::vector blocks_signatures; - blocks_signatures.reserve (size); - std::vector signatures; - signatures.reserve (size); - std::vector verifications; - verifications.resize (size, 0); - for (auto i (0); i < size; ++i) - { - auto & item (items[i]); - hashes.push_back (item.block->hash ()); - messages.push_back (hashes.back ().bytes.data ()); - lengths.push_back (sizeof (decltype (hashes)::value_type)); - nano::account account (item.block->account ()); - if (!item.block->link ().is_zero () && node.ledger.is_epoch_link (item.block->link ())) - { - account = node.ledger.epoch_signer (item.block->link ()); - } - else if (!item.account.is_zero ()) - { - account = item.account; - } - accounts.push_back (account); - pub_keys.push_back (accounts.back ().bytes.data ()); - blocks_signatures.push_back (item.block->block_signature ()); - signatures.push_back (blocks_signatures.back ().bytes.data ()); - } - nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; - node.checker.verify (check); - lock_a.lock (); - for (auto i (0); i < size; ++i) + nano::unique_lock lk (mutex); + for (auto i (0); i < verifications.size (); ++i) { debug_assert (verifications[i] == 1 || verifications[i] == 0); auto & item (items.front ()); @@ -230,47 +183,24 @@ void nano::block_processor::verify_state_blocks (nano::unique_lock & } items.pop_front (); } - if (node.config.logging.timing_logging () && timer_l.stop () > std::chrono::milliseconds (10)) - { - node.logger.try_log (boost::str (boost::format ("Batch verified %1% state blocks in %2% %3%") % size % timer_l.value ().count () % timer_l.unit ())); - } - } - else - { - lock_a.lock (); } + condition.notify_all (); } void nano::block_processor::process_batch (nano::unique_lock & lock_a) { + auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); + auto transaction (node.store.tx_begin_write ({ tables::accounts, nano::tables::cached_counts, nano::tables::change_blocks, tables::frontiers, tables::open_blocks, tables::pending, tables::receive_blocks, tables::representation, tables::send_blocks, tables::state_blocks, tables::unchecked }, { tables::confirmation_height })); nano::timer timer_l; lock_a.lock (); timer_l.start (); - // Limit state blocks verification time - - { - if (!state_blocks.empty ()) - { - size_t max_verification_batch (node.flags.block_processor_verification_size != 0 ? node.flags.block_processor_verification_size : 2048 * (node.config.signature_checker_threads + 1)); - while (!state_blocks.empty () && timer_l.before_deadline (std::chrono::seconds (2))) - { - verify_state_blocks (lock_a, max_verification_batch); - } - } - } - lock_a.unlock (); - auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); - auto transaction (node.store.tx_begin_write ({ tables::accounts, nano::tables::cached_counts, nano::tables::change_blocks, tables::frontiers, tables::open_blocks, tables::pending, tables::receive_blocks, tables::representation, tables::send_blocks, tables::state_blocks, tables::unchecked }, { tables::confirmation_height })); - timer_l.restart (); - lock_a.lock (); // Processing blocks unsigned number_of_blocks_processed (0), number_of_forced_processed (0); while ((!blocks.empty () || !forced.empty ()) && (timer_l.before_deadline (node.config.block_processor_batch_max_time) || (number_of_blocks_processed < node.flags.block_processor_batch_size)) && !awaiting_write) { - bool log_this_record = (blocks.size () + state_blocks.size () + forced.size () > 64) && should_log (); - if (log_this_record) + if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ()) { - node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_blocks.size () % forced.size ())); + node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ())); } nano::unchecked_info info; nano::block_hash hash (0); @@ -322,12 +252,6 @@ void nano::block_processor::process_batch (nano::unique_lock & lock_ number_of_blocks_processed++; process_one (transaction, info); lock_a.lock (); - /* Verify more state blocks if blocks deque is empty - Because verification is long process, avoid large deque verification inside of write transaction */ - if (blocks.empty () && !state_blocks.empty ()) - { - verify_state_blocks (lock_a, 256 * (node.config.signature_checker_threads + 1)); - } } awaiting_write = false; lock_a.unlock (); @@ -547,3 +471,22 @@ void nano::block_processor::requeue_invalid (nano::block_hash const & hash_a, na debug_assert (hash_a == info_a.block->hash ()); node.bootstrap_initiator.lazy_requeue (hash_a, info_a.block->previous (), info_a.confirmed); } + +std::unique_ptr nano::collect_container_info (block_processor & block_processor, const std::string & name) +{ + size_t blocks_count; + size_t forced_count; + + { + nano::lock_guard guard (block_processor.mutex); + blocks_count = block_processor.blocks.size (); + forced_count = block_processor.forced.size (); + } + + auto composite = std::make_unique (name); + composite->add_component (collect_container_info (block_processor.state_block_signature_verification, "state_block_signature_verification")); + composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) })); + composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); + composite->add_component (collect_container_info (block_processor.generator, "generator")); + return composite; +} diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 34d87e1b..e87882c4 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -51,22 +52,23 @@ public: private: void queue_unchecked (nano::write_transaction const &, nano::block_hash const &); - void verify_state_blocks (nano::unique_lock &, size_t = std::numeric_limits::max ()); void process_batch (nano::unique_lock &); void process_live (nano::block_hash const &, std::shared_ptr, const bool = false, const bool = false); void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &); + void process_verified_state_blocks (std::deque &, std::vector const &, std::vector const &, std::vector const &); bool stopped; bool active; bool awaiting_write{ false }; std::chrono::steady_clock::time_point next_log; - std::deque state_blocks; std::deque blocks; std::deque> forced; nano::condition_variable condition; nano::node & node; nano::write_database_queue & write_database_queue; std::mutex mutex; + nano::state_block_signature_verification state_block_signature_verification; friend std::unique_ptr collect_container_info (block_processor & block_processor, const std::string & name); }; +std::unique_ptr collect_container_info (block_processor & block_processor, const std::string & name); } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 48c0d39c..65cce5ad 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -82,27 +82,6 @@ std::unique_ptr nano::collect_container_info (re return composite; } -std::unique_ptr nano::collect_container_info (block_processor & block_processor, const std::string & name) -{ - size_t state_blocks_count; - size_t blocks_count; - size_t forced_count; - - { - nano::lock_guard guard (block_processor.mutex); - state_blocks_count = block_processor.state_blocks.size (); - blocks_count = block_processor.blocks.size (); - forced_count = block_processor.forced.size (); - } - - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "state_blocks", state_blocks_count, sizeof (decltype (block_processor.state_blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); - composite->add_component (collect_container_info (block_processor.generator, "generator")); - return composite; -} - nano::node::node (boost::asio::io_context & io_ctx_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, nano::alarm & alarm_a, nano::logging const & logging_a, nano::work_pool & work_a, nano::node_flags flags_a) : node (io_ctx_a, application_path_a, alarm_a, nano::node_config (peering_port_a, logging_a), work_a, flags_a) { diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 36a86b56..a24bc728 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -83,7 +83,6 @@ public: std::unique_ptr collect_container_info (block_arrival & block_arrival, const std::string & name); std::unique_ptr collect_container_info (rep_crawler & rep_crawler, const std::string & name); -std::unique_ptr collect_container_info (block_processor & block_processor, const std::string & name); class node final : public std::enable_shared_from_this { diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 74ff6542..a00fea85 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -75,7 +75,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const toml.put ("io_threads", io_threads, "Number of threads dedicated to I/O opeations. Defaults to the number of CPU threads, and at least 4.\ntype:uint64"); toml.put ("network_threads", network_threads, "Number of threads dedicated to processing network messages. Defaults to the number of CPU threads, and at least 4.\ntype:uint64"); toml.put ("work_threads", work_threads, "Number of threads dedicated to CPU generated work. Defaults to all available CPU threads.\ntype:uint64"); - toml.put ("signature_checker_threads", signature_checker_threads, "Number of additional threads dedicated to signature verification. Defaults to the number of CPU threads minus 1.\ntype:uint64"); + toml.put ("signature_checker_threads", signature_checker_threads, "Number of additional threads dedicated to signature verification. Defaults to number of CPU threads / 2.\ntype:uint64"); toml.put ("enable_voting", enable_voting, "Enable or disable voting. Enabling this option requires additional system resources, namely increased CPU, bandwidth and disk usage.\ntype:bool"); toml.put ("bootstrap_connections", bootstrap_connections, "Number of outbound bootstrap connections. Must be a power of 2. Defaults to 4.\nWarning: a larger amount of connections may use substantially more system memory.\ntype:uint64"); toml.put ("bootstrap_connections_max", bootstrap_connections_max, "Maximum number of inbound bootstrap connections. Defaults to 64.\nWarning: a larger amount of connections may use additional system memory.\ntype:uint64"); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index b6117905..86d9e8de 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -60,7 +60,8 @@ public: unsigned io_threads{ std::max (4, std::thread::hardware_concurrency ()) }; unsigned network_threads{ std::max (4, std::thread::hardware_concurrency ()) }; unsigned work_threads{ std::max (4, std::thread::hardware_concurrency ()) }; - unsigned signature_checker_threads{ (std::thread::hardware_concurrency () != 0) ? std::thread::hardware_concurrency () - 1 : 0 }; /* The calling thread does checks as well so remove it from the number of threads used */ + /* Use half available threads on the system for signature checking. The calling thread does checks as well, so these are extra worker threads */ + unsigned signature_checker_threads{ std::thread::hardware_concurrency () / 2 }; bool enable_voting{ false }; unsigned bootstrap_connections{ 4 }; unsigned bootstrap_connections_max{ 64 }; diff --git a/nano/node/signatures.cpp b/nano/node/signatures.cpp index 30f94138..ff4d2dea 100644 --- a/nano/node/signatures.cpp +++ b/nano/node/signatures.cpp @@ -22,16 +22,13 @@ nano::signature_checker::~signature_checker () void nano::signature_checker::verify (nano::signature_check_set & check_a) { + // Don't process anything else if we have stopped + if (stopped) { - // Don't process anything else if we have stopped - nano::lock_guard guard (mutex); - if (stopped) - { - return; - } + return; } - if (check_a.size < multithreaded_cutoff || single_threaded) + if (check_a.size <= batch_size || single_threaded) { // Not dealing with many so just use the calling thread for checking signatures auto result = verify_batch (check_a, 0, check_a.size); @@ -76,32 +73,26 @@ void nano::signature_checker::verify (nano::signature_check_set & check_a) void nano::signature_checker::stop () { - nano::lock_guard guard (mutex); - if (!stopped) + if (!stopped.exchange (true)) { - stopped = true; thread_pool.join (); } } void nano::signature_checker::flush () { - nano::lock_guard guard (mutex); while (!stopped && tasks_remaining != 0) ; } bool nano::signature_checker::verify_batch (const nano::signature_check_set & check_a, size_t start_index, size_t size) { - /* Returns false if there are at least 1 invalid signature */ - auto code (nano::validate_message_batch (check_a.messages + start_index, check_a.message_lengths + start_index, check_a.pub_keys + start_index, check_a.signatures + start_index, size, check_a.verifications + start_index)); - (void)code; - + nano::validate_message_batch (check_a.messages + start_index, check_a.message_lengths + start_index, check_a.pub_keys + start_index, check_a.signatures + start_index, size, check_a.verifications + start_index); return std::all_of (check_a.verifications + start_index, check_a.verifications + start_index + size, [](int verification) { return verification == 0 || verification == 1; }); } /* This operates on a number of signatures of size (num_batches * batch_size) from the beginning of the check_a pointers. - * Caller should check the value of the promise which indicateswhen the work has been completed. + * Caller should check the value of the promise which indicates when the work has been completed. */ void nano::signature_checker::verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise & promise) { @@ -129,10 +120,6 @@ void nano::signature_checker::verify_async (nano::signature_check_set & check_a, // Set the names of all the threads in the thread pool for easier identification void nano::signature_checker::set_thread_names (unsigned num_threads) { - auto ready = false; - auto pending = num_threads; - nano::condition_variable cv; - std::vector> promises (num_threads); std::vector> futures; futures.reserve (num_threads); @@ -142,21 +129,8 @@ void nano::signature_checker::set_thread_names (unsigned num_threads) for (auto i = 0u; i < num_threads; ++i) { - boost::asio::post (thread_pool, [&cv, &ready, &pending, &mutex = mutex, &promise = promises[i]]() { - nano::unique_lock lk (mutex); + boost::asio::post (thread_pool, [& promise = promises[i]]() { nano::thread_role::set (nano::thread_role::name::signature_checking); - if (--pending == 0) - { - // All threads have been reached - ready = true; - lk.unlock (); - cv.notify_all (); - } - else - { - // We need to wait until the other threads are finished - cv.wait (lk, [&ready]() { return ready; }); - } promise.set_value (); }); } @@ -166,5 +140,4 @@ void nano::signature_checker::set_thread_names (unsigned num_threads) { future.wait (); } - debug_assert (pending == 0); } diff --git a/nano/node/signatures.hpp b/nano/node/signatures.hpp index 61a6bb7a..72f26b9f 100644 --- a/nano/node/signatures.hpp +++ b/nano/node/signatures.hpp @@ -55,12 +55,9 @@ private: void set_thread_names (unsigned num_threads); boost::asio::thread_pool thread_pool; std::atomic tasks_remaining{ 0 }; - /** minimum signature_check_set size eligible to be multithreaded */ - static constexpr size_t multithreaded_cutoff = 513; static constexpr size_t batch_size = 256; const bool single_threaded; unsigned num_threads; - std::mutex mutex; - bool stopped{ false }; + std::atomic stopped{ false }; }; } diff --git a/nano/node/state_block_signature_verification.cpp b/nano/node/state_block_signature_verification.cpp new file mode 100644 index 00000000..4402ef22 --- /dev/null +++ b/nano/node/state_block_signature_verification.cpp @@ -0,0 +1,164 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +nano::state_block_signature_verification::state_block_signature_verification (nano::signature_checker & signature_checker, nano::epochs & epochs, nano::node_config & node_config, nano::logger_mt & logger, uint64_t state_block_signature_verification_size) : +signature_checker (signature_checker), +epochs (epochs), +node_config (node_config), +logger (logger), +thread ([this, state_block_signature_verification_size]() { + nano::thread_role::set (nano::thread_role::name::state_block_signature_verification); + this->run (state_block_signature_verification_size); +}) +{ +} + +nano::state_block_signature_verification::~state_block_signature_verification () +{ + stop (); +} + +void nano::state_block_signature_verification::stop () +{ + { + nano::lock_guard guard (mutex); + stopped = true; + } + + if (thread.joinable ()) + { + condition.notify_one (); + thread.join (); + } +} + +void nano::state_block_signature_verification::run (uint64_t state_block_signature_verification_size) +{ + nano::unique_lock lk (mutex); + while (!stopped) + { + if (!state_blocks.empty ()) + { + size_t const max_verification_batch (state_block_signature_verification_size != 0 ? state_block_signature_verification_size : 256 * (node_config.signature_checker_threads + 1)); + active = true; + while (!state_blocks.empty () && !stopped) + { + auto items = setup_items (max_verification_batch); + lk.unlock (); + verify_state_blocks (items); + lk.lock (); + } + active = false; + } + else + { + condition.wait (lk); + } + } +} + +bool nano::state_block_signature_verification::is_active () +{ + nano::lock_guard guard (mutex); + return active; +} + +void nano::state_block_signature_verification::add (nano::unchecked_info const & info_a) +{ + { + nano::lock_guard guard (mutex); + state_blocks.push_back (info_a); + } + condition.notify_one (); +} + +size_t nano::state_block_signature_verification::size () +{ + nano::lock_guard guard (mutex); + return state_blocks.size (); +} + +std::deque nano::state_block_signature_verification::setup_items (size_t max_count) +{ + std::deque items; + if (state_blocks.size () <= max_count) + { + items.swap (state_blocks); + } + else + { + for (auto i (0); i < max_count; ++i) + { + items.push_back (state_blocks.front ()); + state_blocks.pop_front (); + } + debug_assert (!state_blocks.empty ()); + } + return items; +} + +void nano::state_block_signature_verification::verify_state_blocks (std::deque & items) +{ + if (!items.empty ()) + { + nano::timer<> timer_l; + timer_l.start (); + auto size (items.size ()); + std::vector hashes; + hashes.reserve (size); + std::vector messages; + messages.reserve (size); + std::vector lengths; + lengths.reserve (size); + std::vector accounts; + accounts.reserve (size); + std::vector pub_keys; + pub_keys.reserve (size); + std::vector blocks_signatures; + blocks_signatures.reserve (size); + std::vector signatures; + signatures.reserve (size); + std::vector verifications; + verifications.resize (size, 0); + for (auto & item : items) + { + hashes.push_back (item.block->hash ()); + messages.push_back (hashes.back ().bytes.data ()); + lengths.push_back (sizeof (decltype (hashes)::value_type)); + nano::account account (item.block->account ()); + if (!item.block->link ().is_zero () && epochs.is_epoch_link (item.block->link ())) + { + account = epochs.signer (epochs.epoch (item.block->link ())); + } + else if (!item.account.is_zero ()) + { + account = item.account; + } + accounts.push_back (account); + pub_keys.push_back (accounts.back ().bytes.data ()); + blocks_signatures.push_back (item.block->block_signature ()); + signatures.push_back (blocks_signatures.back ().bytes.data ()); + } + nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; + signature_checker.verify (check); + if (node_config.logging.timing_logging () && timer_l.stop () > std::chrono::milliseconds (10)) + { + logger.try_log (boost::str (boost::format ("Batch verified %1% state blocks in %2% %3%") % size % timer_l.value ().count () % timer_l.unit ())); + } + blocks_verified_callback (items, verifications, hashes, blocks_signatures); + } +} + +std::unique_ptr nano::collect_container_info (state_block_signature_verification & state_block_signature_verification, const std::string & name) +{ + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "state_blocks", state_block_signature_verification.size (), sizeof (nano::unchecked_info) })); + return composite; +} diff --git a/nano/node/state_block_signature_verification.hpp b/nano/node/state_block_signature_verification.hpp new file mode 100644 index 00000000..7af89bfd --- /dev/null +++ b/nano/node/state_block_signature_verification.hpp @@ -0,0 +1,48 @@ +#pragma once + +#include +#include + +#include +#include +#include + +namespace nano +{ +class epochs; +class logger_mt; +class node_config; +class signature_checker; + +class state_block_signature_verification +{ +public: + state_block_signature_verification (nano::signature_checker &, nano::epochs &, nano::node_config &, nano::logger_mt &, uint64_t); + ~state_block_signature_verification (); + void add (nano::unchecked_info const & info_a); + size_t size (); + void stop (); + bool is_active (); + + std::function &, std::vector const &, std::vector const &, std::vector const &)> blocks_verified_callback; + +private: + nano::signature_checker & signature_checker; + nano::epochs & epochs; + nano::node_config & node_config; + nano::logger_mt & logger; + + std::mutex mutex; + bool stopped{ false }; + bool active{ false }; + std::deque state_blocks; + nano::condition_variable condition; + std::thread thread; + + void run (uint64_t block_processor_verification_size); + std::deque setup_items (size_t); + void verify_state_blocks (std::deque &); +}; + +std::unique_ptr collect_container_info (state_block_signature_verification & state_block_signature_verification, const std::string & name); +}