From d2d62f8e114f4f35fc46cf15cd36ee8e144eb330 Mon Sep 17 00:00:00 2001 From: cryptocode Date: Tue, 5 Feb 2019 10:56:56 +0100 Subject: [PATCH] Factor out signature checker (#1700) --- nano/node/CMakeLists.txt | 2 + nano/node/node.cpp | 165 -------------------------------------- nano/node/node.hpp | 54 +------------ nano/node/signatures.cpp | 168 +++++++++++++++++++++++++++++++++++++++ nano/node/signatures.hpp | 66 +++++++++++++++ 5 files changed, 237 insertions(+), 218 deletions(-) create mode 100644 nano/node/signatures.cpp create mode 100644 nano/node/signatures.hpp diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 977ff0b44..c565e0602 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -45,6 +45,8 @@ add_library (node rpc.cpp testing.hpp testing.cpp + signatures.hpp + signatures.cpp wallet.hpp wallet.cpp stats.hpp diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 69e94febe..390b2fe5f 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1360,171 +1360,6 @@ std::unique_ptr collect_seq_con_info (rep_crawler & rep_ return composite; } } -nano::signature_checker::signature_checker (unsigned num_threads) : -thread_pool (num_threads), -single_threaded (num_threads == 0), -num_threads (num_threads) -{ - if (!single_threaded) - { - set_thread_names (num_threads); - } -} - -nano::signature_checker::~signature_checker () -{ - stop (); -} - -void nano::signature_checker::verify (nano::signature_check_set & check_a) -{ - { - // Don't process anything else if we have stopped - std::lock_guard guard (mutex); - if (stopped) - { - return; - } - } - - if (check_a.size < multithreaded_cutoff || single_threaded) - { - // Not dealing with many so just use the calling thread for checking signatures - auto result = verify_batch (check_a, 0, check_a.size); - release_assert (result); - return; - } - - // Split up the tasks equally over the calling thread and the thread pool. - // Any overflow on the modulus of the batch_size is given to the calling thread, so the thread pool - // only ever operates on batch_size sizes. - size_t overflow_size = check_a.size % batch_size; - size_t num_full_batches = check_a.size / batch_size; - - auto total_threads_to_split_over = num_threads + 1; - auto num_base_batches_each = num_full_batches / total_threads_to_split_over; - auto num_full_overflow_batches = num_full_batches % total_threads_to_split_over; - - auto size_calling_thread = (num_base_batches_each * batch_size) + overflow_size; - auto num_full_batches_thread = (num_base_batches_each * num_threads); - if (num_full_overflow_batches > 0) - { - size_calling_thread += batch_size; - auto remaining = num_full_overflow_batches - 1; - num_full_batches_thread += remaining; - } - - release_assert (check_a.size == (num_full_batches_thread * batch_size + size_calling_thread)); - - std::promise promise; - std::future future = promise.get_future (); - - // Verify a number of signature batches over the thread pool (does not block) - verify_async (check_a, num_full_batches_thread, promise); - - // Verify the rest on the calling thread, this operates on the signatures at the end of the check set - auto result = verify_batch (check_a, check_a.size - size_calling_thread, size_calling_thread); - release_assert (result); - - // Blocks until all the work is done - future.wait (); -} - -void nano::signature_checker::stop () -{ - std::lock_guard guard (mutex); - if (!stopped) - { - stopped = true; - thread_pool.join (); - } -} - -void nano::signature_checker::flush () -{ - std::lock_guard guard (mutex); - while (!stopped && tasks_remaining != 0) - ; -} - -bool nano::signature_checker::verify_batch (const nano::signature_check_set & check_a, size_t start_index, size_t size) -{ - /* Returns false if there are at least 1 invalid signature */ - auto code (nano::validate_message_batch (check_a.messages + start_index, check_a.message_lengths + start_index, check_a.pub_keys + start_index, check_a.signatures + start_index, size, check_a.verifications + start_index)); - (void)code; - - return std::all_of (check_a.verifications + start_index, check_a.verifications + start_index + size, [](int verification) { return verification == 0 || verification == 1; }); -} - -/* This operates on a number of signatures of size (num_batches * batch_size) from the beginning of the check_a pointers. - * Caller should check the value of the promise which indicateswhen the work has been completed. - */ -void nano::signature_checker::verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise & promise) -{ - auto task = std::make_shared (check_a, num_batches); - ++tasks_remaining; - - for (size_t batch = 0; batch < num_batches; ++batch) - { - auto size = batch_size; - auto start_index = batch * batch_size; - - boost::asio::post (thread_pool, [this, task, size, start_index, &promise] { - auto result = this->verify_batch (task->check, start_index, size); - release_assert (result); - - if (--task->pending == 0) - { - --tasks_remaining; - promise.set_value (); - } - }); - } -} - -// Set the names of all the threads in the thread pool for easier identification -void nano::signature_checker::set_thread_names (unsigned num_threads) -{ - auto ready = false; - auto pending = num_threads; - std::condition_variable cv; - std::vector> promises (num_threads); - std::vector> futures; - futures.reserve (num_threads); - std::transform (promises.begin (), promises.end (), std::back_inserter (futures), [](auto & promise) { - return promise.get_future (); - }); - - for (auto i = 0u; i < num_threads; ++i) - { - // clang-format off - boost::asio::post (thread_pool, [&cv, &ready, &pending, &mutex = mutex, &promise = promises[i]]() { - std::unique_lock lk (mutex); - nano::thread_role::set (nano::thread_role::name::signature_checking); - if (--pending == 0) - { - // All threads have been reached - ready = true; - lk.unlock (); - cv.notify_all (); - } - else - { - // We need to wait until the other threads are finished - cv.wait (lk, [&ready]() { return ready; }); - } - promise.set_value (); - }); - // clang-format on - } - - // Wait until all threads have finished - for (auto & future : futures) - { - future.wait (); - } - assert (pending == 0); -} namespace nano { diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 2eeba5291..96dcf993f 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -423,59 +424,6 @@ public: }; std::unique_ptr collect_seq_con_info (rep_crawler & rep_crawler, const std::string & name); - -class signature_check_set final -{ -public: - signature_check_set (size_t size, unsigned char const ** messages, size_t * message_lengths, unsigned char const ** pub_keys, unsigned char const ** signatures, int * verifications) : - size (size), messages (messages), message_lengths (message_lengths), pub_keys (pub_keys), signatures (signatures), verifications (verifications) - { - } - - size_t size; - unsigned char const ** messages; - size_t * message_lengths; - unsigned char const ** pub_keys; - unsigned char const ** signatures; - int * verifications; -}; -class signature_checker final -{ -public: - signature_checker (unsigned num_threads); - ~signature_checker (); - void verify (signature_check_set &); - void stop (); - void flush (); - -private: - struct Task final - { - Task (nano::signature_check_set & check, int pending) : - check (check), pending (pending) - { - } - ~Task () - { - release_assert (pending == 0); - } - nano::signature_check_set & check; - std::atomic pending; - }; - - bool verify_batch (const nano::signature_check_set & check_a, size_t index, size_t size); - void verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise & promise); - void set_thread_names (unsigned num_threads); - boost::asio::thread_pool thread_pool; - std::atomic tasks_remaining{ 0 }; - static constexpr size_t multithreaded_cutoff = 513; // minimum signature_check_set size eligible to be multithreaded - static constexpr size_t batch_size = 256; - const bool single_threaded; - unsigned num_threads; - std::mutex mutex; - bool stopped{ false }; -}; - std::unique_ptr collect_seq_con_info (block_processor & block_processor, const std::string & name); class node : public std::enable_shared_from_this diff --git a/nano/node/signatures.cpp b/nano/node/signatures.cpp new file mode 100644 index 000000000..29336d7b1 --- /dev/null +++ b/nano/node/signatures.cpp @@ -0,0 +1,168 @@ +#include +#include + +nano::signature_checker::signature_checker (unsigned num_threads) : +thread_pool (num_threads), +single_threaded (num_threads == 0), +num_threads (num_threads) +{ + if (!single_threaded) + { + set_thread_names (num_threads); + } +} + +nano::signature_checker::~signature_checker () +{ + stop (); +} + +void nano::signature_checker::verify (nano::signature_check_set & check_a) +{ + { + // Don't process anything else if we have stopped + std::lock_guard guard (mutex); + if (stopped) + { + return; + } + } + + if (check_a.size < multithreaded_cutoff || single_threaded) + { + // Not dealing with many so just use the calling thread for checking signatures + auto result = verify_batch (check_a, 0, check_a.size); + release_assert (result); + return; + } + + // Split up the tasks equally over the calling thread and the thread pool. + // Any overflow on the modulus of the batch_size is given to the calling thread, so the thread pool + // only ever operates on batch_size sizes. + size_t overflow_size = check_a.size % batch_size; + size_t num_full_batches = check_a.size / batch_size; + + auto total_threads_to_split_over = num_threads + 1; + auto num_base_batches_each = num_full_batches / total_threads_to_split_over; + auto num_full_overflow_batches = num_full_batches % total_threads_to_split_over; + + auto size_calling_thread = (num_base_batches_each * batch_size) + overflow_size; + auto num_full_batches_thread = (num_base_batches_each * num_threads); + if (num_full_overflow_batches > 0) + { + size_calling_thread += batch_size; + auto remaining = num_full_overflow_batches - 1; + num_full_batches_thread += remaining; + } + + release_assert (check_a.size == (num_full_batches_thread * batch_size + size_calling_thread)); + + std::promise promise; + std::future future = promise.get_future (); + + // Verify a number of signature batches over the thread pool (does not block) + verify_async (check_a, num_full_batches_thread, promise); + + // Verify the rest on the calling thread, this operates on the signatures at the end of the check set + auto result = verify_batch (check_a, check_a.size - size_calling_thread, size_calling_thread); + release_assert (result); + + // Blocks until all the work is done + future.wait (); +} + +void nano::signature_checker::stop () +{ + std::lock_guard guard (mutex); + if (!stopped) + { + stopped = true; + thread_pool.join (); + } +} + +void nano::signature_checker::flush () +{ + std::lock_guard guard (mutex); + while (!stopped && tasks_remaining != 0) + ; +} + +bool nano::signature_checker::verify_batch (const nano::signature_check_set & check_a, size_t start_index, size_t size) +{ + /* Returns false if there are at least 1 invalid signature */ + auto code (nano::validate_message_batch (check_a.messages + start_index, check_a.message_lengths + start_index, check_a.pub_keys + start_index, check_a.signatures + start_index, size, check_a.verifications + start_index)); + (void)code; + + return std::all_of (check_a.verifications + start_index, check_a.verifications + start_index + size, [](int verification) { return verification == 0 || verification == 1; }); +} + +/* This operates on a number of signatures of size (num_batches * batch_size) from the beginning of the check_a pointers. + * Caller should check the value of the promise which indicateswhen the work has been completed. + */ +void nano::signature_checker::verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise & promise) +{ + auto task = std::make_shared (check_a, num_batches); + ++tasks_remaining; + + for (size_t batch = 0; batch < num_batches; ++batch) + { + auto size = batch_size; + auto start_index = batch * batch_size; + + boost::asio::post (thread_pool, [this, task, size, start_index, &promise] { + auto result = this->verify_batch (task->check, start_index, size); + release_assert (result); + + if (--task->pending == 0) + { + --tasks_remaining; + promise.set_value (); + } + }); + } +} + +// Set the names of all the threads in the thread pool for easier identification +void nano::signature_checker::set_thread_names (unsigned num_threads) +{ + auto ready = false; + auto pending = num_threads; + std::condition_variable cv; + std::vector> promises (num_threads); + std::vector> futures; + futures.reserve (num_threads); + std::transform (promises.begin (), promises.end (), std::back_inserter (futures), [](auto & promise) { + return promise.get_future (); + }); + + for (auto i = 0u; i < num_threads; ++i) + { + // clang-format off + boost::asio::post (thread_pool, [&cv, &ready, &pending, &mutex = mutex, &promise = promises[i]]() { + std::unique_lock lk (mutex); + nano::thread_role::set (nano::thread_role::name::signature_checking); + if (--pending == 0) + { + // All threads have been reached + ready = true; + lk.unlock (); + cv.notify_all (); + } + else + { + // We need to wait until the other threads are finished + cv.wait (lk, [&ready]() { return ready; }); + } + promise.set_value (); + }); + // clang-format on + } + + // Wait until all threads have finished + for (auto & future : futures) + { + future.wait (); + } + assert (pending == 0); +} diff --git a/nano/node/signatures.hpp b/nano/node/signatures.hpp new file mode 100644 index 000000000..172b7a203 --- /dev/null +++ b/nano/node/signatures.hpp @@ -0,0 +1,66 @@ +#pragma once + +#include +#include +#include +#include + +#include + +namespace nano +{ +class signature_check_set final +{ +public: + signature_check_set (size_t size, unsigned char const ** messages, size_t * message_lengths, unsigned char const ** pub_keys, unsigned char const ** signatures, int * verifications) : + size (size), messages (messages), message_lengths (message_lengths), pub_keys (pub_keys), signatures (signatures), verifications (verifications) + { + } + + size_t size; + unsigned char const ** messages; + size_t * message_lengths; + unsigned char const ** pub_keys; + unsigned char const ** signatures; + int * verifications; +}; + +/** Multi-threaded signature checker */ +class signature_checker final +{ +public: + signature_checker (unsigned num_threads); + ~signature_checker (); + void verify (signature_check_set &); + void stop (); + void flush (); + +private: + struct Task final + { + Task (nano::signature_check_set & check, int pending) : + check (check), pending (pending) + { + } + ~Task () + { + release_assert (pending == 0); + } + nano::signature_check_set & check; + std::atomic pending; + }; + + bool verify_batch (const nano::signature_check_set & check_a, size_t index, size_t size); + void verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise & promise); + void set_thread_names (unsigned num_threads); + boost::asio::thread_pool thread_pool; + std::atomic tasks_remaining{ 0 }; + /** minimum signature_check_set size eligible to be multithreaded */ + static constexpr size_t multithreaded_cutoff = 513; + static constexpr size_t batch_size = 256; + const bool single_threaded; + unsigned num_threads; + std::mutex mutex; + bool stopped{ false }; +}; +}