Multi-thread the signature checker (#1651)
* Use a thread pool for the signature checker
* Formatting and set threads used in test to 4
* Cleanup
* Check that pending is 0 when destructor of Task is called
* Make sure all threads have the correct name. Fix mismatch of unsigned/size_t & create future before thread is called
* Formatting lambda
* Removed future from calling thread and now the calling thread does some of the work instead of just blocking.
* Formatting lambda again
* Make release_assert a debug assert in the set thread names function
* Increase max signature check sizes being processed and add a missed release assert
* The signature checker now only uses the maximum number of cores available to reduce thread interleaving
* boost:🧵:hardware_concurrency () can return 0 so account for that.
* Forgot that config.signature_checker_threads is 0 based so add 1 to num threads passed into verify_state_blocks
This commit is contained in:
parent
9c3b26b683
commit
28a10823c6
6 changed files with 305 additions and 112 deletions
|
@ -634,9 +634,14 @@ TEST (node_config, v15_v16_upgrade)
|
|||
auto upgraded (false);
|
||||
nano::node_config config;
|
||||
config.logging.init (path);
|
||||
ASSERT_FALSE (tree.get_optional_child ("allow_local_peers")); // allow_local_peers should not be present now
|
||||
// These config options should not be present at version 15
|
||||
ASSERT_FALSE (tree.get_optional_child ("allow_local_peers"));
|
||||
ASSERT_FALSE (tree.get_optional_child ("signature_checker_threads"));
|
||||
config.deserialize_json (upgraded, tree);
|
||||
ASSERT_TRUE (!!tree.get_optional_child ("allow_local_peers")); // allow_local_peers should be added after the update
|
||||
// The config options should be added after the upgrade
|
||||
ASSERT_TRUE (!!tree.get_optional_child ("allow_local_peers"));
|
||||
ASSERT_TRUE (!!tree.get_optional_child ("signature_checker_threads"));
|
||||
|
||||
ASSERT_TRUE (upgraded);
|
||||
auto version (tree.get<std::string> ("version"));
|
||||
|
||||
|
@ -670,18 +675,22 @@ TEST (node_config, allow_local_peers)
|
|||
nano::node_config config;
|
||||
config.logging.init (path);
|
||||
|
||||
// Check config is correct when allow_local_peers is false
|
||||
// Check config is correct
|
||||
tree.put ("allow_local_peers", false);
|
||||
tree.put ("signature_checker_threads", 1);
|
||||
config.deserialize_json (upgraded, tree);
|
||||
ASSERT_FALSE (upgraded);
|
||||
ASSERT_FALSE (config.allow_local_peers);
|
||||
ASSERT_EQ (config.signature_checker_threads, 1);
|
||||
|
||||
// Check config is correct when allow_local_peers is true
|
||||
// Check config is correct with other values
|
||||
tree.put ("allow_local_peers", true);
|
||||
tree.put ("signature_checker_threads", 4);
|
||||
upgraded = false;
|
||||
config.deserialize_json (upgraded, tree);
|
||||
ASSERT_FALSE (upgraded);
|
||||
ASSERT_TRUE (config.allow_local_peers);
|
||||
ASSERT_EQ (config.signature_checker_threads, 4);
|
||||
}
|
||||
|
||||
// Regression test to ensure that deserializing includes changes node via get_required_child
|
||||
|
|
|
@ -1,23 +1,19 @@
|
|||
#include <gtest/gtest.h>
|
||||
|
||||
#include <future>
|
||||
#include <nano/node/node.hpp>
|
||||
|
||||
TEST (signature_checker, empty)
|
||||
{
|
||||
nano::signature_checker checker;
|
||||
std::promise<void> promise;
|
||||
nano::signature_check_set check = { 0, nullptr, nullptr, nullptr, nullptr, nullptr, &promise };
|
||||
checker.add (check);
|
||||
promise.get_future ().wait ();
|
||||
nano::signature_checker checker (0);
|
||||
nano::signature_check_set check = { 0, nullptr, nullptr, nullptr, nullptr, nullptr };
|
||||
checker.verify (check);
|
||||
}
|
||||
|
||||
TEST (signature_checker, many)
|
||||
TEST (signature_checker, bulk_single_thread)
|
||||
{
|
||||
nano::keypair key;
|
||||
nano::state_block block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0);
|
||||
nano::signature_checker checker;
|
||||
std::promise<void> promise;
|
||||
nano::signature_checker checker (0);
|
||||
std::vector<nano::uint256_union> hashes;
|
||||
size_t size (1000);
|
||||
hashes.reserve (size);
|
||||
|
@ -39,34 +35,114 @@ TEST (signature_checker, many)
|
|||
pub_keys.push_back (block.hashables.account.bytes.data ());
|
||||
signatures.push_back (block.signature.bytes.data ());
|
||||
}
|
||||
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise };
|
||||
checker.add (check);
|
||||
promise.get_future ().wait ();
|
||||
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
|
||||
checker.verify (check);
|
||||
bool all_valid = std::all_of (verifications.cbegin (), verifications.cend (), [](auto verification) { return verification == 1; });
|
||||
ASSERT_TRUE (all_valid);
|
||||
}
|
||||
|
||||
TEST (signature_checker, many_multi_threaded)
|
||||
{
|
||||
nano::signature_checker checker (4);
|
||||
|
||||
auto signature_checker_work_func = [&checker]() {
|
||||
nano::keypair key;
|
||||
nano::state_block block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0);
|
||||
auto block_hash = block.hash ();
|
||||
|
||||
nano::state_block invalid_block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0);
|
||||
invalid_block.signature.bytes[31] ^= 0x1;
|
||||
auto invalid_block_hash = block.hash ();
|
||||
|
||||
constexpr auto num_check_sizes = 18;
|
||||
constexpr std::array<size_t, num_check_sizes> check_sizes{ 2048, 256, 1024, 1,
|
||||
4096, 512, 2050, 1024, 8092, 513, 17, 1024, 2047, 255, 513, 2049, 1025, 1023 };
|
||||
|
||||
std::vector<nano::signature_check_set> signature_checker_sets;
|
||||
signature_checker_sets.reserve (num_check_sizes);
|
||||
|
||||
// Create containers so everything is kept in scope while the threads work on the signature checks
|
||||
std::array<std::vector<unsigned char const *>, num_check_sizes> messages;
|
||||
std::array<std::vector<size_t>, num_check_sizes> lengths;
|
||||
std::array<std::vector<unsigned char const *>, num_check_sizes> pub_keys;
|
||||
std::array<std::vector<unsigned char const *>, num_check_sizes> signatures;
|
||||
std::array<std::vector<int>, num_check_sizes> verifications;
|
||||
|
||||
// Populate all the signature check sets. The last one in each set is given an incorrect block signature.
|
||||
for (int i = 0; i < num_check_sizes; ++i)
|
||||
{
|
||||
auto check_size = check_sizes[i];
|
||||
assert (check_size > 0);
|
||||
auto last_signature_index = check_size - 1;
|
||||
|
||||
messages[i].resize (check_size);
|
||||
std::fill (messages[i].begin (), messages[i].end (), block_hash.bytes.data ());
|
||||
messages[i][last_signature_index] = invalid_block_hash.bytes.data ();
|
||||
|
||||
lengths[i].resize (check_size);
|
||||
std::fill (lengths[i].begin (), lengths[i].end (), sizeof (decltype (block_hash)));
|
||||
|
||||
pub_keys[i].resize (check_size);
|
||||
std::fill (pub_keys[i].begin (), pub_keys[i].end (), block.hashables.account.bytes.data ());
|
||||
pub_keys[i][last_signature_index] = invalid_block.hashables.account.bytes.data ();
|
||||
|
||||
signatures[i].resize (check_size);
|
||||
std::fill (signatures[i].begin (), signatures[i].end (), block.signature.bytes.data ());
|
||||
signatures[i][last_signature_index] = invalid_block.signature.bytes.data ();
|
||||
|
||||
verifications[i].resize (check_size);
|
||||
|
||||
signature_checker_sets.emplace_back (check_size, messages[i].data (), lengths[i].data (), pub_keys[i].data (), signatures[i].data (), verifications[i].data ());
|
||||
checker.verify (signature_checker_sets[i]);
|
||||
|
||||
// Confirm all but last are valid
|
||||
auto all_valid = std::all_of (verifications[i].cbegin (), verifications[i].cend () - 1, [](auto verification) { return verification == 1; });
|
||||
ASSERT_TRUE (all_valid);
|
||||
ASSERT_EQ (verifications[i][last_signature_index], 0);
|
||||
}
|
||||
};
|
||||
|
||||
std::thread signature_checker_thread1 (signature_checker_work_func);
|
||||
std::thread signature_checker_thread2 (signature_checker_work_func);
|
||||
|
||||
signature_checker_thread1.join ();
|
||||
signature_checker_thread2.join ();
|
||||
}
|
||||
|
||||
TEST (signature_checker, one)
|
||||
{
|
||||
nano::signature_checker checker (0);
|
||||
|
||||
auto verify_block = [&checker](auto & block, auto result) {
|
||||
std::vector<nano::uint256_union> hashes;
|
||||
std::vector<unsigned char const *> messages;
|
||||
std::vector<size_t> lengths;
|
||||
std::vector<unsigned char const *> pub_keys;
|
||||
std::vector<unsigned char const *> signatures;
|
||||
std::vector<int> verifications;
|
||||
size_t size (1);
|
||||
verifications.resize (size);
|
||||
for (auto i (0); i < size; ++i)
|
||||
{
|
||||
hashes.push_back (block.hash ());
|
||||
messages.push_back (hashes.back ().bytes.data ());
|
||||
lengths.push_back (sizeof (decltype (hashes)::value_type));
|
||||
pub_keys.push_back (block.hashables.account.bytes.data ());
|
||||
signatures.push_back (block.signature.bytes.data ());
|
||||
}
|
||||
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
|
||||
checker.verify (check);
|
||||
ASSERT_EQ (verifications.front (), result);
|
||||
};
|
||||
|
||||
nano::keypair key;
|
||||
nano::state_block block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0);
|
||||
nano::signature_checker checker;
|
||||
std::promise<void> promise;
|
||||
std::vector<nano::uint256_union> hashes;
|
||||
std::vector<unsigned char const *> messages;
|
||||
std::vector<size_t> lengths;
|
||||
std::vector<unsigned char const *> pub_keys;
|
||||
std::vector<unsigned char const *> signatures;
|
||||
std::vector<int> verifications;
|
||||
size_t size (1);
|
||||
verifications.resize (size);
|
||||
for (auto i (0); i < size; ++i)
|
||||
{
|
||||
hashes.push_back (block.hash ());
|
||||
messages.push_back (hashes.back ().bytes.data ());
|
||||
lengths.push_back (sizeof (decltype (hashes)::value_type));
|
||||
pub_keys.push_back (block.hashables.account.bytes.data ());
|
||||
signatures.push_back (block.signature.bytes.data ());
|
||||
}
|
||||
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise };
|
||||
checker.add (check);
|
||||
promise.get_future ().wait ();
|
||||
|
||||
// Make signaure invalid and check result is incorrect
|
||||
block.signature.bytes[31] ^= 0x1;
|
||||
verify_block (block, 0);
|
||||
|
||||
// Make it valid and check for succcess
|
||||
block.signature.bytes[31] ^= 0x1;
|
||||
verify_block (block, 1);
|
||||
}
|
||||
|
|
|
@ -1100,10 +1100,8 @@ void nano::vote_processor::verify_votes (std::deque<std::pair<std::shared_ptr<na
|
|||
pub_keys.push_back (vote.first->account.bytes.data ());
|
||||
signatures.push_back (vote.first->signature.bytes.data ());
|
||||
}
|
||||
std::promise<void> promise;
|
||||
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise };
|
||||
node.checker.add (check);
|
||||
promise.get_future ().wait ();
|
||||
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
|
||||
node.checker.verify (check);
|
||||
std::remove_reference_t<decltype (votes_a)> result;
|
||||
auto i (0);
|
||||
for (auto & vote : votes_a)
|
||||
|
@ -1244,15 +1242,14 @@ bool nano::rep_crawler::exists (nano::block_hash const & hash_a)
|
|||
return active.count (hash_a) != 0;
|
||||
}
|
||||
|
||||
nano::signature_checker::signature_checker () :
|
||||
started (false),
|
||||
stopped (false),
|
||||
thread ([this]() { run (); })
|
||||
nano::signature_checker::signature_checker (unsigned num_threads) :
|
||||
thread_pool (num_threads),
|
||||
single_threaded (num_threads == 0),
|
||||
num_threads (num_threads)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (mutex);
|
||||
while (!started)
|
||||
if (!single_threaded)
|
||||
{
|
||||
condition.wait (lock);
|
||||
set_thread_names (num_threads);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1261,74 +1258,155 @@ nano::signature_checker::~signature_checker ()
|
|||
stop ();
|
||||
}
|
||||
|
||||
void nano::signature_checker::add (nano::signature_check_set & check_a)
|
||||
void nano::signature_checker::verify (nano::signature_check_set & check_a)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock (mutex);
|
||||
checks.push_back (check_a);
|
||||
// Don't process anything else if we have stopped
|
||||
std::lock_guard<std::mutex> guard (stopped_mutex);
|
||||
if (stopped)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
condition.notify_all ();
|
||||
|
||||
if (check_a.size < multithreaded_cutoff || single_threaded)
|
||||
{
|
||||
// Not dealing with many so just use the calling thread for checking signatures
|
||||
auto result = verify_batch (check_a, 0, check_a.size);
|
||||
release_assert (result);
|
||||
return;
|
||||
}
|
||||
|
||||
// Split up the tasks equally over the calling thread and the thread pool.
|
||||
// Any overflow on the modulus of the batch_size is given to the calling thread, so the thread pool
|
||||
// only ever operates on batch_size sizes.
|
||||
size_t overflow_size = check_a.size % batch_size;
|
||||
size_t num_full_batches = check_a.size / batch_size;
|
||||
|
||||
auto total_threads_to_split_over = num_threads + 1;
|
||||
auto num_base_batches_each = num_full_batches / total_threads_to_split_over;
|
||||
auto num_full_overflow_batches = num_full_batches % total_threads_to_split_over;
|
||||
|
||||
auto size_calling_thread = (num_base_batches_each * batch_size) + overflow_size;
|
||||
auto num_full_batches_thread = (num_base_batches_each * num_threads);
|
||||
if (num_full_overflow_batches > 0)
|
||||
{
|
||||
size_calling_thread += batch_size;
|
||||
auto remaining = num_full_overflow_batches - 1;
|
||||
num_full_batches_thread += remaining;
|
||||
}
|
||||
|
||||
release_assert (check_a.size == (num_full_batches_thread * batch_size + size_calling_thread));
|
||||
|
||||
std::promise<void> promise;
|
||||
std::future<void> future = promise.get_future ();
|
||||
|
||||
// Verify a number of signature batches over the thread pool (does not block)
|
||||
verify_async (check_a, num_full_batches_thread, promise);
|
||||
|
||||
// Verify the rest on the calling thread, this operates on the signatures at the end of the check set
|
||||
auto result = verify_batch (check_a, check_a.size - size_calling_thread, size_calling_thread);
|
||||
release_assert (result);
|
||||
|
||||
// Blocks until all the work is done
|
||||
future.wait ();
|
||||
}
|
||||
|
||||
void nano::signature_checker::stop ()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (mutex);
|
||||
stopped = true;
|
||||
lock.unlock ();
|
||||
condition.notify_all ();
|
||||
if (thread.joinable ())
|
||||
std::lock_guard<std::mutex> guard (stopped_mutex);
|
||||
if (!stopped)
|
||||
{
|
||||
thread.join ();
|
||||
stopped = true;
|
||||
thread_pool.join ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::signature_checker::flush ()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock (mutex);
|
||||
while (!stopped && !checks.empty ())
|
||||
{
|
||||
condition.wait (lock);
|
||||
}
|
||||
std::lock_guard<std::mutex> guard (stopped_mutex);
|
||||
while (!stopped && tasks_remaining != 0)
|
||||
;
|
||||
}
|
||||
|
||||
void nano::signature_checker::verify (nano::signature_check_set & check_a)
|
||||
bool nano::signature_checker::verify_batch (const nano::signature_check_set & check_a, size_t start_index, size_t size)
|
||||
{
|
||||
/* Verifications is vector if signatures check results
|
||||
validate_message_batch returing "true" if there are at least 1 invalid signature */
|
||||
auto code (nano::validate_message_batch (check_a.messages, check_a.message_lengths, check_a.pub_keys, check_a.signatures, check_a.size, check_a.verifications));
|
||||
/* Returns false if there are at least 1 invalid signature */
|
||||
auto code (nano::validate_message_batch (check_a.messages + start_index, check_a.message_lengths + start_index, check_a.pub_keys + start_index, check_a.signatures + start_index, size, check_a.verifications + start_index));
|
||||
(void)code;
|
||||
release_assert (std::all_of (check_a.verifications, check_a.verifications + check_a.size, [](int verification) { return verification == 0 || verification == 1; }));
|
||||
check_a.promise->set_value ();
|
||||
|
||||
return std::all_of (check_a.verifications + start_index, check_a.verifications + start_index + size, [](int verification) { return verification == 0 || verification == 1; });
|
||||
}
|
||||
|
||||
void nano::signature_checker::run ()
|
||||
/* This operates on a number of signatures of size (num_batches * batch_size) from the beginning of the check_a pointers.
|
||||
* Caller should check the value of the promise which indicateswhen the work has been completed.
|
||||
*/
|
||||
void nano::signature_checker::verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise<void> & promise)
|
||||
{
|
||||
nano::thread_role::set (nano::thread_role::name::signature_checking);
|
||||
std::unique_lock<std::mutex> lock (mutex);
|
||||
started = true;
|
||||
auto task = std::make_shared<Task> (check_a, num_batches);
|
||||
++tasks_remaining;
|
||||
|
||||
lock.unlock ();
|
||||
condition.notify_all ();
|
||||
lock.lock ();
|
||||
|
||||
while (!stopped)
|
||||
for (size_t batch = 0; batch < num_batches; ++batch)
|
||||
{
|
||||
if (!checks.empty ())
|
||||
{
|
||||
auto check (checks.front ());
|
||||
checks.pop_front ();
|
||||
lock.unlock ();
|
||||
verify (check);
|
||||
condition.notify_all ();
|
||||
lock.lock ();
|
||||
}
|
||||
else
|
||||
{
|
||||
condition.wait (lock);
|
||||
}
|
||||
auto size = batch_size;
|
||||
auto start_index = batch * batch_size;
|
||||
|
||||
boost::asio::post (thread_pool, [this, task, size, start_index, &promise] {
|
||||
auto result = this->verify_batch (task->check, start_index, size);
|
||||
release_assert (result);
|
||||
|
||||
if (--task->pending == 0)
|
||||
{
|
||||
--tasks_remaining;
|
||||
promise.set_value ();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Set the names of all the threads in the thread pool for easier identification
|
||||
void nano::signature_checker::set_thread_names (unsigned num_threads)
|
||||
{
|
||||
auto ready = false;
|
||||
auto pending = num_threads;
|
||||
std::condition_variable cv;
|
||||
std::mutex mutex_l;
|
||||
std::vector<std::promise<void>> promises (num_threads);
|
||||
std::vector<std::future<void>> futures;
|
||||
futures.reserve (num_threads);
|
||||
std::transform (promises.begin (), promises.end (), std::back_inserter (futures), [](auto & promise) {
|
||||
return promise.get_future ();
|
||||
});
|
||||
|
||||
for (auto i = 0u; i < num_threads; ++i)
|
||||
{
|
||||
boost::asio::post (thread_pool, [&cv, &ready, &pending, &mutex_l, &promise = promises[i] ]() {
|
||||
std::unique_lock<std::mutex> lk (mutex_l);
|
||||
nano::thread_role::set (nano::thread_role::name::signature_checking);
|
||||
if (--pending == 0)
|
||||
{
|
||||
// All threads have been reached
|
||||
ready = true;
|
||||
lk.unlock ();
|
||||
cv.notify_all ();
|
||||
}
|
||||
else
|
||||
{
|
||||
// We need to wait until the other threads are finished
|
||||
cv.wait (lk, [&ready]() { return ready; });
|
||||
}
|
||||
promise.set_value ();
|
||||
});
|
||||
}
|
||||
|
||||
// Wait until all threads have finished
|
||||
for (auto & future : futures)
|
||||
{
|
||||
future.wait ();
|
||||
}
|
||||
assert (pending == 0);
|
||||
}
|
||||
|
||||
nano::block_processor::block_processor (nano::node & node_a) :
|
||||
stopped (false),
|
||||
active (false),
|
||||
|
@ -1488,10 +1566,8 @@ void nano::block_processor::verify_state_blocks (nano::transaction const & trans
|
|||
pub_keys.push_back (block.hashables.account.bytes.data ());
|
||||
signatures.push_back (block.signature.bytes.data ());
|
||||
}
|
||||
std::promise<void> promise;
|
||||
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise };
|
||||
node.checker.add (check);
|
||||
promise.get_future ().wait ();
|
||||
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
|
||||
node.checker.verify (check);
|
||||
lock_a.lock ();
|
||||
for (auto i (0); i < size; ++i)
|
||||
{
|
||||
|
@ -1524,7 +1600,7 @@ void nano::block_processor::process_batch (std::unique_lock<std::mutex> & lock_a
|
|||
auto transaction (node.store.tx_begin_read ());
|
||||
while (!state_blocks.empty () && timer_l.before_deadline (std::chrono::seconds (2)))
|
||||
{
|
||||
verify_state_blocks (transaction, lock_a, 2048);
|
||||
verify_state_blocks (transaction, lock_a, 2048 * (node.config.signature_checker_threads + 1));
|
||||
}
|
||||
}
|
||||
lock_a.unlock ();
|
||||
|
@ -1609,7 +1685,7 @@ void nano::block_processor::process_batch (std::unique_lock<std::mutex> & lock_a
|
|||
Because verification is long process, avoid large deque verification inside of write transaction */
|
||||
if (blocks.empty () && !state_blocks.empty ())
|
||||
{
|
||||
verify_state_blocks (transaction, lock_a, 256);
|
||||
verify_state_blocks (transaction, lock_a, 256 * (node.config.signature_checker_threads + 1));
|
||||
}
|
||||
}
|
||||
lock_a.unlock ();
|
||||
|
@ -1783,6 +1859,7 @@ peers (network.endpoint ()),
|
|||
application_path (application_path_a),
|
||||
wallets (init_a.wallet_init, *this),
|
||||
port_mapping (*this),
|
||||
checker (config.signature_checker_threads),
|
||||
vote_processor (*this),
|
||||
warmed_up (0),
|
||||
block_processor (*this),
|
||||
|
@ -3672,7 +3749,7 @@ void nano::active_transactions::request_loop ()
|
|||
while (!stopped)
|
||||
{
|
||||
request_confirm (lock);
|
||||
unsigned extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2);
|
||||
const auto extra_delay (std::min (roots.size (), max_broadcast_queue) * node.network.broadcast_interval_ms * 2);
|
||||
condition.wait_for (lock, std::chrono::milliseconds (request_interval_ms + extra_delay));
|
||||
}
|
||||
}
|
||||
|
@ -3860,7 +3937,7 @@ nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned
|
|||
{
|
||||
boost::thread::attributes attrs;
|
||||
nano::thread_attributes::set (attrs);
|
||||
for (auto i (0); i < service_threads_a; ++i)
|
||||
for (auto i (0u); i < service_threads_a; ++i)
|
||||
{
|
||||
threads.push_back (boost::thread (attrs, [&io_ctx_a]() {
|
||||
nano::thread_role::set (nano::thread_role::name::io);
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include <condition_variable>
|
||||
#include <queue>
|
||||
|
||||
#include <boost/asio/thread_pool.hpp>
|
||||
#include <boost/iostreams/device/array.hpp>
|
||||
#include <boost/multi_index/hashed_index.hpp>
|
||||
#include <boost/multi_index/member.hpp>
|
||||
|
@ -387,36 +388,58 @@ public:
|
|||
std::unordered_set<nano::block_hash> active;
|
||||
};
|
||||
class block_processor;
|
||||
class signature_check_set
|
||||
class signature_check_set final
|
||||
{
|
||||
public:
|
||||
signature_check_set (size_t size, unsigned char const ** messages, size_t * message_lengths, unsigned char const ** pub_keys, unsigned char const ** signatures, int * verifications) :
|
||||
size (size), messages (messages), message_lengths (message_lengths), pub_keys (pub_keys), signatures (signatures), verifications (verifications)
|
||||
{
|
||||
}
|
||||
|
||||
size_t size;
|
||||
unsigned char const ** messages;
|
||||
size_t * message_lengths;
|
||||
unsigned char const ** pub_keys;
|
||||
unsigned char const ** signatures;
|
||||
int * verifications;
|
||||
std::promise<void> * promise;
|
||||
};
|
||||
class signature_checker
|
||||
class signature_checker final
|
||||
{
|
||||
public:
|
||||
signature_checker ();
|
||||
signature_checker (unsigned num_threads);
|
||||
~signature_checker ();
|
||||
void add (signature_check_set &);
|
||||
void verify (signature_check_set &);
|
||||
void stop ();
|
||||
void flush ();
|
||||
|
||||
private:
|
||||
void run ();
|
||||
void verify (nano::signature_check_set & check_a);
|
||||
std::deque<nano::signature_check_set> checks;
|
||||
bool started;
|
||||
bool stopped;
|
||||
std::mutex mutex;
|
||||
std::condition_variable condition;
|
||||
std::thread thread;
|
||||
struct Task final
|
||||
{
|
||||
Task (nano::signature_check_set & check, int pending) :
|
||||
check (check), pending (pending)
|
||||
{
|
||||
}
|
||||
~Task ()
|
||||
{
|
||||
release_assert (pending == 0);
|
||||
}
|
||||
nano::signature_check_set & check;
|
||||
std::atomic<int> pending;
|
||||
};
|
||||
|
||||
bool verify_batch (const nano::signature_check_set & check_a, size_t index, size_t size);
|
||||
void verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise<void> & promise);
|
||||
void set_thread_names (unsigned num_threads);
|
||||
boost::asio::thread_pool thread_pool;
|
||||
std::atomic<int> tasks_remaining{ 0 };
|
||||
static constexpr size_t multithreaded_cutoff = 513; // minimum signature_check_set size eligible to be multithreaded
|
||||
static constexpr size_t batch_size = 256;
|
||||
const bool single_threaded;
|
||||
unsigned num_threads;
|
||||
std::mutex stopped_mutex;
|
||||
bool stopped{ false };
|
||||
};
|
||||
|
||||
class rolled_hash
|
||||
{
|
||||
public:
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
namespace
|
||||
{
|
||||
const char * preconfigured_peers_key = "preconfigured_peers";
|
||||
const char * signature_checker_threads_key = "signature_checker_threads";
|
||||
const char * default_beta_peer_network = "peering-beta.nano.org";
|
||||
const char * default_live_peer_network = "peering.nano.org";
|
||||
}
|
||||
|
@ -27,6 +28,7 @@ password_fanout (1024),
|
|||
io_threads (std::max<unsigned> (4, boost::thread::hardware_concurrency ())),
|
||||
network_threads (std::max<unsigned> (4, boost::thread::hardware_concurrency ())),
|
||||
work_threads (std::max<unsigned> (4, boost::thread::hardware_concurrency ())),
|
||||
signature_checker_threads ((boost::thread::hardware_concurrency () != 0) ? boost::thread::hardware_concurrency () - 1 : 0), /* The calling thread does checks as well so remove it from the number of threads used */
|
||||
enable_voting (false),
|
||||
bootstrap_connections (4),
|
||||
bootstrap_connections_max (64),
|
||||
|
@ -106,6 +108,7 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const
|
|||
json.put ("io_threads", io_threads);
|
||||
json.put ("network_threads", network_threads);
|
||||
json.put ("work_threads", work_threads);
|
||||
json.put (signature_checker_threads_key, signature_checker_threads);
|
||||
json.put ("enable_voting", enable_voting);
|
||||
json.put ("bootstrap_connections", bootstrap_connections);
|
||||
json.put ("bootstrap_connections_max", bootstrap_connections_max);
|
||||
|
@ -231,6 +234,9 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso
|
|||
nano::jsonconfig ipc_l;
|
||||
ipc_config.serialize_json (ipc_l);
|
||||
json.put_child ("ipc", ipc_l);
|
||||
|
||||
json.put (signature_checker_threads_key, signature_checker_threads);
|
||||
|
||||
upgraded = true;
|
||||
}
|
||||
case 16:
|
||||
|
@ -345,6 +351,7 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco
|
|||
json.get<int> ("lmdb_max_dbs", lmdb_max_dbs);
|
||||
json.get<bool> ("enable_voting", enable_voting);
|
||||
json.get<bool> ("allow_local_peers", allow_local_peers);
|
||||
json.get<unsigned> (signature_checker_threads_key, signature_checker_threads);
|
||||
|
||||
// Validate ranges
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ public:
|
|||
unsigned io_threads;
|
||||
unsigned network_threads;
|
||||
unsigned work_threads;
|
||||
unsigned signature_checker_threads;
|
||||
bool enable_voting;
|
||||
unsigned bootstrap_connections;
|
||||
unsigned bootstrap_connections_max;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue