diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 3590485b..b4fe93bf 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -634,9 +634,14 @@ TEST (node_config, v15_v16_upgrade) auto upgraded (false); nano::node_config config; config.logging.init (path); - ASSERT_FALSE (tree.get_optional_child ("allow_local_peers")); // allow_local_peers should not be present now + // These config options should not be present at version 15 + ASSERT_FALSE (tree.get_optional_child ("allow_local_peers")); + ASSERT_FALSE (tree.get_optional_child ("signature_checker_threads")); config.deserialize_json (upgraded, tree); - ASSERT_TRUE (!!tree.get_optional_child ("allow_local_peers")); // allow_local_peers should be added after the update + // The config options should be added after the upgrade + ASSERT_TRUE (!!tree.get_optional_child ("allow_local_peers")); + ASSERT_TRUE (!!tree.get_optional_child ("signature_checker_threads")); + ASSERT_TRUE (upgraded); auto version (tree.get ("version")); @@ -670,18 +675,22 @@ TEST (node_config, allow_local_peers) nano::node_config config; config.logging.init (path); - // Check config is correct when allow_local_peers is false + // Check config is correct tree.put ("allow_local_peers", false); + tree.put ("signature_checker_threads", 1); config.deserialize_json (upgraded, tree); ASSERT_FALSE (upgraded); ASSERT_FALSE (config.allow_local_peers); + ASSERT_EQ (config.signature_checker_threads, 1); - // Check config is correct when allow_local_peers is true + // Check config is correct with other values tree.put ("allow_local_peers", true); + tree.put ("signature_checker_threads", 4); upgraded = false; config.deserialize_json (upgraded, tree); ASSERT_FALSE (upgraded); ASSERT_TRUE (config.allow_local_peers); + ASSERT_EQ (config.signature_checker_threads, 4); } // Regression test to ensure that deserializing includes changes node via get_required_child diff --git a/nano/core_test/signing.cpp b/nano/core_test/signing.cpp index c86c4e4b..0d9a9e49 100644 --- a/nano/core_test/signing.cpp +++ b/nano/core_test/signing.cpp @@ -1,23 +1,19 @@ #include -#include #include TEST (signature_checker, empty) { - nano::signature_checker checker; - std::promise promise; - nano::signature_check_set check = { 0, nullptr, nullptr, nullptr, nullptr, nullptr, &promise }; - checker.add (check); - promise.get_future ().wait (); + nano::signature_checker checker (0); + nano::signature_check_set check = { 0, nullptr, nullptr, nullptr, nullptr, nullptr }; + checker.verify (check); } -TEST (signature_checker, many) +TEST (signature_checker, bulk_single_thread) { nano::keypair key; nano::state_block block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0); - nano::signature_checker checker; - std::promise promise; + nano::signature_checker checker (0); std::vector hashes; size_t size (1000); hashes.reserve (size); @@ -39,34 +35,114 @@ TEST (signature_checker, many) pub_keys.push_back (block.hashables.account.bytes.data ()); signatures.push_back (block.signature.bytes.data ()); } - nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise }; - checker.add (check); - promise.get_future ().wait (); + nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; + checker.verify (check); + bool all_valid = std::all_of (verifications.cbegin (), verifications.cend (), [](auto verification) { return verification == 1; }); + ASSERT_TRUE (all_valid); +} + +TEST (signature_checker, many_multi_threaded) +{ + nano::signature_checker checker (4); + + auto signature_checker_work_func = [&checker]() { + nano::keypair key; + nano::state_block block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0); + auto block_hash = block.hash (); + + nano::state_block invalid_block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0); + invalid_block.signature.bytes[31] ^= 0x1; + auto invalid_block_hash = block.hash (); + + constexpr auto num_check_sizes = 18; + constexpr std::array check_sizes{ 2048, 256, 1024, 1, + 4096, 512, 2050, 1024, 8092, 513, 17, 1024, 2047, 255, 513, 2049, 1025, 1023 }; + + std::vector signature_checker_sets; + signature_checker_sets.reserve (num_check_sizes); + + // Create containers so everything is kept in scope while the threads work on the signature checks + std::array, num_check_sizes> messages; + std::array, num_check_sizes> lengths; + std::array, num_check_sizes> pub_keys; + std::array, num_check_sizes> signatures; + std::array, num_check_sizes> verifications; + + // Populate all the signature check sets. The last one in each set is given an incorrect block signature. + for (int i = 0; i < num_check_sizes; ++i) + { + auto check_size = check_sizes[i]; + assert (check_size > 0); + auto last_signature_index = check_size - 1; + + messages[i].resize (check_size); + std::fill (messages[i].begin (), messages[i].end (), block_hash.bytes.data ()); + messages[i][last_signature_index] = invalid_block_hash.bytes.data (); + + lengths[i].resize (check_size); + std::fill (lengths[i].begin (), lengths[i].end (), sizeof (decltype (block_hash))); + + pub_keys[i].resize (check_size); + std::fill (pub_keys[i].begin (), pub_keys[i].end (), block.hashables.account.bytes.data ()); + pub_keys[i][last_signature_index] = invalid_block.hashables.account.bytes.data (); + + signatures[i].resize (check_size); + std::fill (signatures[i].begin (), signatures[i].end (), block.signature.bytes.data ()); + signatures[i][last_signature_index] = invalid_block.signature.bytes.data (); + + verifications[i].resize (check_size); + + signature_checker_sets.emplace_back (check_size, messages[i].data (), lengths[i].data (), pub_keys[i].data (), signatures[i].data (), verifications[i].data ()); + checker.verify (signature_checker_sets[i]); + + // Confirm all but last are valid + auto all_valid = std::all_of (verifications[i].cbegin (), verifications[i].cend () - 1, [](auto verification) { return verification == 1; }); + ASSERT_TRUE (all_valid); + ASSERT_EQ (verifications[i][last_signature_index], 0); + } + }; + + std::thread signature_checker_thread1 (signature_checker_work_func); + std::thread signature_checker_thread2 (signature_checker_work_func); + + signature_checker_thread1.join (); + signature_checker_thread2.join (); } TEST (signature_checker, one) { + nano::signature_checker checker (0); + + auto verify_block = [&checker](auto & block, auto result) { + std::vector hashes; + std::vector messages; + std::vector lengths; + std::vector pub_keys; + std::vector signatures; + std::vector verifications; + size_t size (1); + verifications.resize (size); + for (auto i (0); i < size; ++i) + { + hashes.push_back (block.hash ()); + messages.push_back (hashes.back ().bytes.data ()); + lengths.push_back (sizeof (decltype (hashes)::value_type)); + pub_keys.push_back (block.hashables.account.bytes.data ()); + signatures.push_back (block.signature.bytes.data ()); + } + nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; + checker.verify (check); + ASSERT_EQ (verifications.front (), result); + }; + nano::keypair key; nano::state_block block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0); - nano::signature_checker checker; - std::promise promise; - std::vector hashes; - std::vector messages; - std::vector lengths; - std::vector pub_keys; - std::vector signatures; - std::vector verifications; - size_t size (1); - verifications.resize (size); - for (auto i (0); i < size; ++i) - { - hashes.push_back (block.hash ()); - messages.push_back (hashes.back ().bytes.data ()); - lengths.push_back (sizeof (decltype (hashes)::value_type)); - pub_keys.push_back (block.hashables.account.bytes.data ()); - signatures.push_back (block.signature.bytes.data ()); - } - nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise }; - checker.add (check); - promise.get_future ().wait (); + + // Make signaure invalid and check result is incorrect + block.signature.bytes[31] ^= 0x1; + verify_block (block, 0); + + // Make it valid and check for succcess + block.signature.bytes[31] ^= 0x1; + verify_block (block, 1); } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 278d6b6a..c0d2d1ed 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1100,10 +1100,8 @@ void nano::vote_processor::verify_votes (std::dequeaccount.bytes.data ()); signatures.push_back (vote.first->signature.bytes.data ()); } - std::promise promise; - nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise }; - node.checker.add (check); - promise.get_future ().wait (); + nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; + node.checker.verify (check); std::remove_reference_t result; auto i (0); for (auto & vote : votes_a) @@ -1244,15 +1242,14 @@ bool nano::rep_crawler::exists (nano::block_hash const & hash_a) return active.count (hash_a) != 0; } -nano::signature_checker::signature_checker () : -started (false), -stopped (false), -thread ([this]() { run (); }) +nano::signature_checker::signature_checker (unsigned num_threads) : +thread_pool (num_threads), +single_threaded (num_threads == 0), +num_threads (num_threads) { - std::unique_lock lock (mutex); - while (!started) + if (!single_threaded) { - condition.wait (lock); + set_thread_names (num_threads); } } @@ -1261,74 +1258,155 @@ nano::signature_checker::~signature_checker () stop (); } -void nano::signature_checker::add (nano::signature_check_set & check_a) +void nano::signature_checker::verify (nano::signature_check_set & check_a) { { - std::lock_guard lock (mutex); - checks.push_back (check_a); + // Don't process anything else if we have stopped + std::lock_guard guard (stopped_mutex); + if (stopped) + { + return; + } } - condition.notify_all (); + + if (check_a.size < multithreaded_cutoff || single_threaded) + { + // Not dealing with many so just use the calling thread for checking signatures + auto result = verify_batch (check_a, 0, check_a.size); + release_assert (result); + return; + } + + // Split up the tasks equally over the calling thread and the thread pool. + // Any overflow on the modulus of the batch_size is given to the calling thread, so the thread pool + // only ever operates on batch_size sizes. + size_t overflow_size = check_a.size % batch_size; + size_t num_full_batches = check_a.size / batch_size; + + auto total_threads_to_split_over = num_threads + 1; + auto num_base_batches_each = num_full_batches / total_threads_to_split_over; + auto num_full_overflow_batches = num_full_batches % total_threads_to_split_over; + + auto size_calling_thread = (num_base_batches_each * batch_size) + overflow_size; + auto num_full_batches_thread = (num_base_batches_each * num_threads); + if (num_full_overflow_batches > 0) + { + size_calling_thread += batch_size; + auto remaining = num_full_overflow_batches - 1; + num_full_batches_thread += remaining; + } + + release_assert (check_a.size == (num_full_batches_thread * batch_size + size_calling_thread)); + + std::promise promise; + std::future future = promise.get_future (); + + // Verify a number of signature batches over the thread pool (does not block) + verify_async (check_a, num_full_batches_thread, promise); + + // Verify the rest on the calling thread, this operates on the signatures at the end of the check set + auto result = verify_batch (check_a, check_a.size - size_calling_thread, size_calling_thread); + release_assert (result); + + // Blocks until all the work is done + future.wait (); } void nano::signature_checker::stop () { - std::unique_lock lock (mutex); - stopped = true; - lock.unlock (); - condition.notify_all (); - if (thread.joinable ()) + std::lock_guard guard (stopped_mutex); + if (!stopped) { - thread.join (); + stopped = true; + thread_pool.join (); } } void nano::signature_checker::flush () { - std::unique_lock lock (mutex); - while (!stopped && !checks.empty ()) - { - condition.wait (lock); - } + std::lock_guard guard (stopped_mutex); + while (!stopped && tasks_remaining != 0) + ; } -void nano::signature_checker::verify (nano::signature_check_set & check_a) +bool nano::signature_checker::verify_batch (const nano::signature_check_set & check_a, size_t start_index, size_t size) { - /* Verifications is vector if signatures check results - validate_message_batch returing "true" if there are at least 1 invalid signature */ - auto code (nano::validate_message_batch (check_a.messages, check_a.message_lengths, check_a.pub_keys, check_a.signatures, check_a.size, check_a.verifications)); + /* Returns false if there are at least 1 invalid signature */ + auto code (nano::validate_message_batch (check_a.messages + start_index, check_a.message_lengths + start_index, check_a.pub_keys + start_index, check_a.signatures + start_index, size, check_a.verifications + start_index)); (void)code; - release_assert (std::all_of (check_a.verifications, check_a.verifications + check_a.size, [](int verification) { return verification == 0 || verification == 1; })); - check_a.promise->set_value (); + + return std::all_of (check_a.verifications + start_index, check_a.verifications + start_index + size, [](int verification) { return verification == 0 || verification == 1; }); } -void nano::signature_checker::run () +/* This operates on a number of signatures of size (num_batches * batch_size) from the beginning of the check_a pointers. + * Caller should check the value of the promise which indicateswhen the work has been completed. + */ +void nano::signature_checker::verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise & promise) { - nano::thread_role::set (nano::thread_role::name::signature_checking); - std::unique_lock lock (mutex); - started = true; + auto task = std::make_shared (check_a, num_batches); + ++tasks_remaining; - lock.unlock (); - condition.notify_all (); - lock.lock (); - - while (!stopped) + for (size_t batch = 0; batch < num_batches; ++batch) { - if (!checks.empty ()) - { - auto check (checks.front ()); - checks.pop_front (); - lock.unlock (); - verify (check); - condition.notify_all (); - lock.lock (); - } - else - { - condition.wait (lock); - } + auto size = batch_size; + auto start_index = batch * batch_size; + + boost::asio::post (thread_pool, [this, task, size, start_index, &promise] { + auto result = this->verify_batch (task->check, start_index, size); + release_assert (result); + + if (--task->pending == 0) + { + --tasks_remaining; + promise.set_value (); + } + }); } } +// Set the names of all the threads in the thread pool for easier identification +void nano::signature_checker::set_thread_names (unsigned num_threads) +{ + auto ready = false; + auto pending = num_threads; + std::condition_variable cv; + std::mutex mutex_l; + std::vector> promises (num_threads); + std::vector> futures; + futures.reserve (num_threads); + std::transform (promises.begin (), promises.end (), std::back_inserter (futures), [](auto & promise) { + return promise.get_future (); + }); + + for (auto i = 0u; i < num_threads; ++i) + { + boost::asio::post (thread_pool, [&cv, &ready, &pending, &mutex_l, &promise = promises[i] ]() { + std::unique_lock lk (mutex_l); + nano::thread_role::set (nano::thread_role::name::signature_checking); + if (--pending == 0) + { + // All threads have been reached + ready = true; + lk.unlock (); + cv.notify_all (); + } + else + { + // We need to wait until the other threads are finished + cv.wait (lk, [&ready]() { return ready; }); + } + promise.set_value (); + }); + } + + // Wait until all threads have finished + for (auto & future : futures) + { + future.wait (); + } + assert (pending == 0); +} + nano::block_processor::block_processor (nano::node & node_a) : stopped (false), active (false), @@ -1488,10 +1566,8 @@ void nano::block_processor::verify_state_blocks (nano::transaction const & trans pub_keys.push_back (block.hashables.account.bytes.data ()); signatures.push_back (block.signature.bytes.data ()); } - std::promise promise; - nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise }; - node.checker.add (check); - promise.get_future ().wait (); + nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; + node.checker.verify (check); lock_a.lock (); for (auto i (0); i < size; ++i) { @@ -1524,7 +1600,7 @@ void nano::block_processor::process_batch (std::unique_lock & lock_a auto transaction (node.store.tx_begin_read ()); while (!state_blocks.empty () && timer_l.before_deadline (std::chrono::seconds (2))) { - verify_state_blocks (transaction, lock_a, 2048); + verify_state_blocks (transaction, lock_a, 2048 * (node.config.signature_checker_threads + 1)); } } lock_a.unlock (); @@ -1609,7 +1685,7 @@ void nano::block_processor::process_batch (std::unique_lock & lock_a Because verification is long process, avoid large deque verification inside of write transaction */ if (blocks.empty () && !state_blocks.empty ()) { - verify_state_blocks (transaction, lock_a, 256); + verify_state_blocks (transaction, lock_a, 256 * (node.config.signature_checker_threads + 1)); } } lock_a.unlock (); @@ -1783,6 +1859,7 @@ peers (network.endpoint ()), application_path (application_path_a), wallets (init_a.wallet_init, *this), port_mapping (*this), +checker (config.signature_checker_threads), vote_processor (*this), warmed_up (0), block_processor (*this), @@ -3672,7 +3749,7 @@ void nano::active_transactions::request_loop () while (!stopped) { request_confirm (lock); - unsigned extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2); + const auto extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2); condition.wait_for (lock, std::chrono::milliseconds (request_interval_ms + extra_delay)); } } @@ -3860,7 +3937,7 @@ nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned { boost::thread::attributes attrs; nano::thread_attributes::set (attrs); - for (auto i (0); i < service_threads_a; ++i) + for (auto i (0u); i < service_threads_a; ++i) { threads.push_back (boost::thread (attrs, [&io_ctx_a]() { nano::thread_role::set (nano::thread_role::name::io); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 57167872..dab13a93 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -387,36 +388,58 @@ public: std::unordered_set active; }; class block_processor; -class signature_check_set +class signature_check_set final { public: + signature_check_set (size_t size, unsigned char const ** messages, size_t * message_lengths, unsigned char const ** pub_keys, unsigned char const ** signatures, int * verifications) : + size (size), messages (messages), message_lengths (message_lengths), pub_keys (pub_keys), signatures (signatures), verifications (verifications) + { + } + size_t size; unsigned char const ** messages; size_t * message_lengths; unsigned char const ** pub_keys; unsigned char const ** signatures; int * verifications; - std::promise * promise; }; -class signature_checker +class signature_checker final { public: - signature_checker (); + signature_checker (unsigned num_threads); ~signature_checker (); - void add (signature_check_set &); + void verify (signature_check_set &); void stop (); void flush (); private: - void run (); - void verify (nano::signature_check_set & check_a); - std::deque checks; - bool started; - bool stopped; - std::mutex mutex; - std::condition_variable condition; - std::thread thread; + struct Task final + { + Task (nano::signature_check_set & check, int pending) : + check (check), pending (pending) + { + } + ~Task () + { + release_assert (pending == 0); + } + nano::signature_check_set & check; + std::atomic pending; + }; + + bool verify_batch (const nano::signature_check_set & check_a, size_t index, size_t size); + void verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise & promise); + void set_thread_names (unsigned num_threads); + boost::asio::thread_pool thread_pool; + std::atomic tasks_remaining{ 0 }; + static constexpr size_t multithreaded_cutoff = 513; // minimum signature_check_set size eligible to be multithreaded + static constexpr size_t batch_size = 256; + const bool single_threaded; + unsigned num_threads; + std::mutex stopped_mutex; + bool stopped{ false }; }; + class rolled_hash { public: diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 719a1561..5aaa6fb3 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -7,6 +7,7 @@ namespace { const char * preconfigured_peers_key = "preconfigured_peers"; +const char * signature_checker_threads_key = "signature_checker_threads"; const char * default_beta_peer_network = "peering-beta.nano.org"; const char * default_live_peer_network = "peering.nano.org"; } @@ -27,6 +28,7 @@ password_fanout (1024), io_threads (std::max (4, boost::thread::hardware_concurrency ())), network_threads (std::max (4, boost::thread::hardware_concurrency ())), work_threads (std::max (4, boost::thread::hardware_concurrency ())), +signature_checker_threads ((boost::thread::hardware_concurrency () != 0) ? boost::thread::hardware_concurrency () - 1 : 0), /* The calling thread does checks as well so remove it from the number of threads used */ enable_voting (false), bootstrap_connections (4), bootstrap_connections_max (64), @@ -106,6 +108,7 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const json.put ("io_threads", io_threads); json.put ("network_threads", network_threads); json.put ("work_threads", work_threads); + json.put (signature_checker_threads_key, signature_checker_threads); json.put ("enable_voting", enable_voting); json.put ("bootstrap_connections", bootstrap_connections); json.put ("bootstrap_connections_max", bootstrap_connections_max); @@ -231,6 +234,9 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso nano::jsonconfig ipc_l; ipc_config.serialize_json (ipc_l); json.put_child ("ipc", ipc_l); + + json.put (signature_checker_threads_key, signature_checker_threads); + upgraded = true; } case 16: @@ -345,6 +351,7 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco json.get ("lmdb_max_dbs", lmdb_max_dbs); json.get ("enable_voting", enable_voting); json.get ("allow_local_peers", allow_local_peers); + json.get (signature_checker_threads_key, signature_checker_threads); // Validate ranges diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index aabae5b3..86df90f9 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -36,6 +36,7 @@ public: unsigned io_threads; unsigned network_threads; unsigned work_threads; + unsigned signature_checker_threads; bool enable_voting; unsigned bootstrap_connections; unsigned bootstrap_connections_max;