Merge pull request #4267 from clemahieu/remove_signature_threads

This removes the use of separate signature-checking threads/classes from the node.
These classes add complexity to critical code. Even where performance optimizations could be made through multi-threading, it would be better implemented using standard C++ instead of a custom class.
It's unclear if these classes are helping performance at all so we're opting to remove them until/if a performance improvement is needed.

This PR contains 4 commits:

Adds explicit negative tests for state and epoch blocks to verify incorrect signatures are rejected
All blocks passed to block_processor::add_impl are enqueued in the block processor, rather than splitting them based on the type, and passed to separate signature checking threads. This remove use of signature checking threads from block_processor.
Remove use of signature checking threads from vote_processor and check each vote directly in the vote_processor thread
Remove unused signature checking classes.
This commit is contained in:
clemahieu 2023-11-06 15:21:18 +00:00 committed by GitHub
commit ef0dde689e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 63 additions and 881 deletions

View file

@ -41,7 +41,6 @@ add_executable(
scheduler_buckets.cpp
request_aggregator.cpp
signal_manager.cpp
signing.cpp
socket.cpp
system.cpp
telemetry.cpp

View file

@ -39,32 +39,3 @@ TEST (block_processor, broadcast_block_on_arrival)
// Checks whether the block was broadcast.
ASSERT_TIMELY (5s, node2->ledger.block_or_pruned_exists (send1->hash ()));
}
TEST (block_processor, add_blocking_invalid_block)
{
nano::test::system system;
nano::node_config config = system.default_config ();
config.block_process_timeout = std::chrono::seconds{ 1 };
auto & node = *system.add_node (config);
nano::state_block_builder builder;
auto send1 = builder.make_block ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();
send1->signature.clear ();
auto background = std::async (std::launch::async, [&] () {
return node.process_local (send1);
});
ASSERT_TIMELY (5s, background.wait_for (std::chrono::seconds (0)) == std::future_status::ready);
ASSERT_FALSE (background.get ().has_value ());
}

View file

@ -1239,6 +1239,54 @@ TEST (ledger, fail_change_gap_previous)
ASSERT_EQ (nano::process_result::gap_previous, result1.code);
}
TEST (ledger, fail_state_bad_signature)
{
auto ctx = nano::test::context::ledger_empty ();
auto & ledger = ctx.ledger ();
auto & store = ctx.store ();
auto transaction = store.tx_begin_write ();
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::block_builder builder;
auto block = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (0)
.link (nano::dev::genesis_key.pub)
.sign (nano::keypair ().prv, 0)
.work (*pool.generate (nano::dev::genesis->hash ()))
.build ();
auto result1 = ledger.process (transaction, *block);
ASSERT_EQ (nano::process_result::bad_signature, result1.code);
}
TEST (ledger, fail_epoch_bad_signature)
{
auto ctx = nano::test::context::ledger_empty ();
auto & ledger = ctx.ledger ();
auto & store = ctx.store ();
auto transaction = store.tx_begin_write ();
nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits<unsigned>::max () };
nano::block_builder builder;
auto block = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount)
.link (ledger.epoch_link (nano::epoch::epoch_1))
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*pool.generate (nano::dev::genesis->hash ()))
.build_shared ();
block->signature.bytes[0] ^= 1;
auto result1 = ledger.process (transaction, *block);
ASSERT_EQ (nano::process_result::bad_signature, result1.code); // Fails epoch signature
block->signature.bytes[0] ^= 1;
auto result2 = ledger.process (transaction, *block);
ASSERT_EQ (nano::process_result::progress, result2.code); // Succeeds with epoch signature
}
TEST (ledger, fail_change_bad_signature)
{
auto ctx = nano::test::context::ledger_empty ();

View file

@ -2143,6 +2143,8 @@ TEST (node, block_confirm)
auto send1_copy = builder.make_block ()
.from (*send1)
.build_shared ();
auto hash1 = send1->hash ();
auto hash2 = send1_copy->hash ();
node1.block_processor.add (send1);
node2.block_processor.add (send1_copy);
ASSERT_TIMELY (5s, node1.ledger.block_or_pruned_exists (send1->hash ()) && node2.ledger.block_or_pruned_exists (send1_copy->hash ()));

View file

@ -1,250 +0,0 @@
#include <nano/node/signatures.hpp>
#include <nano/secure/common.hpp>
#include <gtest/gtest.h>
TEST (signature_checker, empty)
{
nano::signature_checker checker (0);
nano::signature_check_set check = { 0, nullptr, nullptr, nullptr, nullptr, nullptr };
checker.verify (check);
}
TEST (signature_checker, bulk_single_thread)
{
nano::keypair key;
nano::block_builder builder;
auto block = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (0)
.link (0)
.sign (key.prv, key.pub)
.work (0)
.build ();
nano::signature_checker checker (0);
std::vector<nano::uint256_union> hashes;
size_t size (1000);
hashes.reserve (size);
std::vector<unsigned char const *> messages;
messages.reserve (size);
std::vector<size_t> lengths;
lengths.reserve (size);
std::vector<unsigned char const *> pub_keys;
pub_keys.reserve (size);
std::vector<unsigned char const *> signatures;
signatures.reserve (size);
std::vector<int> verifications;
verifications.resize (size);
for (auto i (0); i < size; ++i)
{
hashes.push_back (block->hash ());
messages.push_back (hashes.back ().bytes.data ());
lengths.push_back (sizeof (decltype (hashes)::value_type));
pub_keys.push_back (block->hashables.account.bytes.data ());
signatures.push_back (block->signature.bytes.data ());
}
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
checker.verify (check);
bool all_valid = std::all_of (verifications.cbegin (), verifications.cend (), [] (auto verification) { return verification == 1; });
ASSERT_TRUE (all_valid);
}
TEST (signature_checker, many_multi_threaded)
{
nano::signature_checker checker (4);
auto signature_checker_work_func = [&checker] () {
nano::keypair key;
nano::block_builder builder;
auto block = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (0)
.link (0)
.sign (key.prv, key.pub)
.work (0)
.build ();
auto block_hash = block->hash ();
auto invalid_block = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (0)
.link (0)
.sign (key.prv, key.pub)
.work (0)
.build ();
invalid_block->signature.bytes[31] ^= 0x1;
auto invalid_block_hash = block->hash ();
constexpr auto num_check_sizes = 18;
constexpr std::array<size_t, num_check_sizes> check_sizes{ 2048, 256, 1024, 1,
4096, 512, 2050, 1024, 8092, 513, 17, 1024, 2047, 255, 513, 2049, 1025, 1023 };
std::vector<nano::signature_check_set> signature_checker_sets;
signature_checker_sets.reserve (num_check_sizes);
// Create containers so everything is kept in scope while the threads work on the signature checks
std::array<std::vector<unsigned char const *>, num_check_sizes> messages;
std::array<std::vector<size_t>, num_check_sizes> lengths;
std::array<std::vector<unsigned char const *>, num_check_sizes> pub_keys;
std::array<std::vector<unsigned char const *>, num_check_sizes> signatures;
std::array<std::vector<int>, num_check_sizes> verifications;
// Populate all the signature check sets. The last one in each set is given an incorrect block signature.
for (int i = 0; i < num_check_sizes; ++i)
{
auto check_size = check_sizes[i];
ASSERT_GT (check_size, 0);
auto last_signature_index = check_size - 1;
messages[i].resize (check_size);
std::fill (messages[i].begin (), messages[i].end (), block_hash.bytes.data ());
messages[i][last_signature_index] = invalid_block_hash.bytes.data ();
lengths[i].resize (check_size);
std::fill (lengths[i].begin (), lengths[i].end (), sizeof (decltype (block_hash)));
pub_keys[i].resize (check_size);
std::fill (pub_keys[i].begin (), pub_keys[i].end (), block->hashables.account.bytes.data ());
pub_keys[i][last_signature_index] = invalid_block->hashables.account.bytes.data ();
signatures[i].resize (check_size);
std::fill (signatures[i].begin (), signatures[i].end (), block->signature.bytes.data ());
signatures[i][last_signature_index] = invalid_block->signature.bytes.data ();
verifications[i].resize (check_size);
signature_checker_sets.emplace_back (check_size, messages[i].data (), lengths[i].data (), pub_keys[i].data (), signatures[i].data (), verifications[i].data ());
checker.verify (signature_checker_sets[i]);
// Confirm all but last are valid
auto all_valid = std::all_of (verifications[i].cbegin (), verifications[i].cend () - 1, [] (auto verification) { return verification == 1; });
ASSERT_TRUE (all_valid);
ASSERT_EQ (verifications[i][last_signature_index], 0);
}
};
std::thread signature_checker_thread1 (signature_checker_work_func);
std::thread signature_checker_thread2 (signature_checker_work_func);
signature_checker_thread1.join ();
signature_checker_thread2.join ();
}
TEST (signature_checker, one)
{
nano::signature_checker checker (0);
auto verify_block = [&checker] (auto & block, auto result) {
std::vector<nano::uint256_union> hashes;
std::vector<unsigned char const *> messages;
std::vector<size_t> lengths;
std::vector<unsigned char const *> pub_keys;
std::vector<unsigned char const *> signatures;
std::vector<int> verifications;
size_t size (1);
verifications.resize (size);
for (auto i (0); i < size; ++i)
{
hashes.push_back (block->hash ());
messages.push_back (hashes.back ().bytes.data ());
lengths.push_back (sizeof (decltype (hashes)::value_type));
pub_keys.push_back (block->hashables.account.bytes.data ());
signatures.push_back (block->signature.bytes.data ());
}
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
checker.verify (check);
ASSERT_EQ (verifications.front (), result);
};
nano::keypair key;
nano::block_builder builder;
auto block = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (0)
.link (0)
.sign (key.prv, key.pub)
.work (0)
.build ();
// Make signaure invalid and check result is incorrect
block->signature.bytes[31] ^= 0x1;
verify_block (block, 0);
// Make it valid and check for succcess
block->signature.bytes[31] ^= 0x1;
verify_block (block, 1);
}
TEST (signature_checker, boundary_checks)
{
// sizes container must be in incrementing order
std::vector<size_t> sizes{ 0, 1 };
auto add_boundary = [&sizes] (size_t boundary) {
sizes.insert (sizes.end (), { boundary - 1, boundary, boundary + 1 });
};
for (auto i = 1; i <= 5; ++i)
{
add_boundary (nano::signature_checker::batch_size * i);
}
nano::signature_checker checker (1);
auto max_size = *(sizes.end () - 1);
std::vector<nano::uint256_union> hashes;
hashes.reserve (max_size);
std::vector<unsigned char const *> messages;
messages.reserve (max_size);
std::vector<size_t> lengths;
lengths.reserve (max_size);
std::vector<unsigned char const *> pub_keys;
pub_keys.reserve (max_size);
std::vector<unsigned char const *> signatures;
signatures.reserve (max_size);
nano::keypair key;
nano::block_builder builder;
auto block = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (0)
.link (0)
.sign (key.prv, key.pub)
.work (0)
.build ();
size_t last_size = 0;
for (auto size : sizes)
{
// The size needed to append to existing containers, saves re-initializing from scratch each iteration
auto extra_size = size - last_size;
std::vector<int> verifications;
verifications.resize (size);
for (auto i (0); i < extra_size; ++i)
{
hashes.push_back (block->hash ());
messages.push_back (hashes.back ().bytes.data ());
lengths.push_back (sizeof (decltype (hashes)::value_type));
pub_keys.push_back (block->hashables.account.bytes.data ());
signatures.push_back (block->signature.bytes.data ());
}
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
checker.verify (check);
bool all_valid = std::all_of (verifications.cbegin (), verifications.cend (), [] (auto verification) { return verification == 1; });
ASSERT_TRUE (all_valid);
last_size = size;
}
}

View file

@ -428,15 +428,6 @@ bool nano::validate_message (nano::public_key const & public_key, nano::uint256_
return validate_message (public_key, message.bytes.data (), sizeof (message.bytes), signature);
}
bool nano::validate_message_batch (const unsigned char ** m, size_t * mlen, const unsigned char ** pk, const unsigned char ** RS, size_t num, int * valid)
{
for (size_t i{ 0 }; i < num; ++i)
{
valid[i] = (0 == ed25519_sign_open (m[i], mlen[i], pk[i], RS[i]));
}
return true;
}
nano::uint128_union::uint128_union (std::string const & string_a)
{
auto error (decode_hex (string_a));

View file

@ -257,7 +257,6 @@ nano::signature sign_message (nano::raw_key const &, nano::public_key const &, n
nano::signature sign_message (nano::raw_key const &, nano::public_key const &, uint8_t const *, size_t);
bool validate_message (nano::public_key const &, nano::uint256_union const &, nano::signature const &);
bool validate_message (nano::public_key const &, uint8_t const *, size_t, nano::signature const &);
bool validate_message_batch (unsigned char const **, size_t *, unsigned char const **, unsigned char const **, size_t, int *);
nano::raw_key deterministic_key (nano::raw_key const &, uint32_t);
nano::public_key pub_key (nano::raw_key const &);

View file

@ -854,23 +854,6 @@ int main (int argc, char * const * argv)
auto end (std::chrono::high_resolution_clock::now ());
std::cerr << "Signature verifications " << std::chrono::duration_cast<std::chrono::microseconds> (end - begin).count () << std::endl;
}
else if (vm.count ("debug_verify_profile_batch"))
{
nano::keypair key;
size_t batch_count (1000);
nano::uint256_union message;
nano::uint512_union signature (nano::sign_message (key.prv, key.pub, message));
std::vector<unsigned char const *> messages (batch_count, message.bytes.data ());
std::vector<size_t> lengths (batch_count, sizeof (message));
std::vector<unsigned char const *> pub_keys (batch_count, key.pub.bytes.data ());
std::vector<unsigned char const *> signatures (batch_count, signature.bytes.data ());
std::vector<int> verifications;
verifications.resize (batch_count);
auto begin (std::chrono::high_resolution_clock::now ());
nano::validate_message_batch (messages.data (), lengths.data (), pub_keys.data (), signatures.data (), batch_count, verifications.data ());
auto end (std::chrono::high_resolution_clock::now ());
std::cerr << "Batch signature verifications " << std::chrono::duration_cast<std::chrono::microseconds> (end - begin).count () << std::endl;
}
else if (vm.count ("debug_profile_sign"))
{
std::cerr << "Starting blocks signing profiling\n";

View file

@ -156,10 +156,6 @@ add_library(
scheduler/optimistic.cpp
scheduler/priority.hpp
scheduler/priority.cpp
signatures.hpp
signatures.cpp
state_block_signature_verification.hpp
state_block_signature_verification.cpp
telemetry.hpp
telemetry.cpp
transport/channel.hpp

View file

@ -9,8 +9,7 @@
nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) :
next_log (std::chrono::steady_clock::now ()),
node (node_a),
write_database_queue (write_database_queue_a),
state_block_signature_verification (node.checker, node.ledger.constants.epochs, node.config, node.logger, node.flags.block_processor_verification_size)
write_database_queue (write_database_queue_a)
{
batch_processed.add ([this] (auto const & items) {
// For every batch item: notify the 'processed' observer.
@ -21,19 +20,6 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas
}
});
blocking.connect (*this);
state_block_signature_verification.blocks_verified_callback = [this] (std::deque<nano::state_block_signature_verification::value_type> & items, std::vector<int> const & verifications, std::vector<nano::block_hash> const & hashes, std::vector<nano::signature> const & blocks_signatures) {
this->process_verified_state_blocks (items, verifications, hashes, blocks_signatures);
};
state_block_signature_verification.transition_inactive_callback = [this] () {
if (this->flushing)
{
{
// Prevent a race with condition.wait in block_processor::flush
nano::lock_guard<nano::mutex> guard{ this->mutex };
}
this->condition.notify_all ();
}
};
processing_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::block_processing);
this->process_blocks ();
@ -48,16 +34,14 @@ void nano::block_processor::stop ()
}
condition.notify_all ();
blocking.stop ();
state_block_signature_verification.stop ();
nano::join_or_pass (processing_thread);
}
void nano::block_processor::flush ()
{
node.checker.flush ();
flushing = true;
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && (have_blocks () || active || state_block_signature_verification.is_active ()))
while (!stopped && (have_blocks () || active))
{
condition.wait (lock);
}
@ -67,7 +51,7 @@ void nano::block_processor::flush ()
std::size_t nano::block_processor::size ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
return (blocks.size () + state_block_signature_verification.size () + forced.size ());
return blocks.size () + forced.size ();
}
bool nano::block_processor::full ()
@ -207,56 +191,16 @@ bool nano::block_processor::have_blocks_ready ()
bool nano::block_processor::have_blocks ()
{
debug_assert (!mutex.try_lock ());
return have_blocks_ready () || state_block_signature_verification.size () != 0;
}
void nano::block_processor::process_verified_state_blocks (std::deque<nano::state_block_signature_verification::value_type> & items, std::vector<int> const & verifications, std::vector<nano::block_hash> const & hashes, std::vector<nano::signature> const & blocks_signatures)
{
{
nano::unique_lock<nano::mutex> lk{ mutex };
for (auto i (0); i < verifications.size (); ++i)
{
debug_assert (verifications[i] == 1 || verifications[i] == 0);
auto & item = items.front ();
auto & [block] = item;
if (!block->link ().is_zero () && node.ledger.is_epoch_link (block->link ()))
{
// Epoch blocks
if (verifications[i] == 1)
{
blocks.emplace_back (block);
}
else
{
// Possible regular state blocks with epoch link (send subtype)
blocks.emplace_back (block);
}
}
else if (verifications[i] == 1)
{
// Non epoch blocks
blocks.emplace_back (block);
}
items.pop_front ();
}
}
condition.notify_all ();
return have_blocks_ready ();
}
void nano::block_processor::add_impl (std::shared_ptr<nano::block> block)
{
if (block->type () == nano::block_type::state || block->type () == nano::block_type::open)
{
state_block_signature_verification.add ({ block });
}
else
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (block);
}
condition.notify_all ();
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (block);
}
condition.notify_all ();
}
auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a) -> std::deque<processed_t>
@ -274,9 +218,9 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
{
if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ())
if ((blocks.size () + forced.size () > 64) && should_log ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ()));
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% forced) in processing queue") % blocks.size () % forced.size ()));
}
std::shared_ptr<nano::block> block;
nano::block_hash hash (0);
@ -478,7 +422,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (bl
}
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (collect_container_info (block_processor.state_block_signature_verification, "state_block_signature_verification"));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) }));
return composite;

View file

@ -2,7 +2,6 @@
#include <nano/lib/blocks.hpp>
#include <nano/node/blocking_observer.hpp>
#include <nano/node/state_block_signature_verification.hpp>
#include <nano/secure/common.hpp>
#include <chrono>
@ -59,7 +58,6 @@ private:
nano::process_return process_one (store::write_transaction const &, std::shared_ptr<nano::block> block, bool const = false);
void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &);
std::deque<processed_t> process_batch (nano::unique_lock<nano::mutex> &);
void process_verified_state_blocks (std::deque<nano::state_block_signature_verification::value_type> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &);
void add_impl (std::shared_ptr<nano::block> block);
bool stopped{ false };
bool active{ false };
@ -70,7 +68,6 @@ private:
nano::node & node;
nano::write_database_queue & write_database_queue;
nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
nano::state_block_signature_verification state_block_signature_verification;
std::thread processing_thread;
friend std::unique_ptr<container_info_component> collect_container_info (block_processor & block_processor, std::string const & name);

View file

@ -156,7 +156,6 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
wallets_store (*wallets_store_impl),
gap_cache (*this),
ledger (store, stats, network_params.ledger, flags_a.generate_cache),
checker (config.signature_checker_threads),
outbound_limiter{ outbound_bandwidth_limiter_config (config) },
// empty `config.peering_port` means the user made no port choice at all;
// otherwise, any value is considered, with `0` having the special meaning of 'let the OS pick a port instead'
@ -176,7 +175,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
application_path (application_path_a),
port_mapping (*this),
rep_crawler (*this),
vote_processor (checker, active, observers, stats, config, flags, logger, online_reps, rep_crawler, ledger, network_params),
vote_processor (active, observers, stats, config, flags, logger, online_reps, rep_crawler, ledger, network_params),
warmed_up (0),
block_processor (*this, write_database_queue),
online_reps (ledger, config),
@ -728,7 +727,6 @@ void nano::node::stop ()
bootstrap_initiator.stop ();
tcp_listener.stop ();
port_mapping.stop ();
checker.stop ();
wallets.stop ();
stats.stop ();
epoch_upgrader.stop ();

View file

@ -29,7 +29,6 @@
#include <nano/node/process_live_dispatcher.hpp>
#include <nano/node/repcrawler.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/signatures.hpp>
#include <nano/node/telemetry.hpp>
#include <nano/node/transport/tcp_server.hpp>
#include <nano/node/unchecked_map.hpp>
@ -159,7 +158,6 @@ public:
nano::wallets_store & wallets_store;
nano::gap_cache gap_cache;
nano::ledger ledger;
nano::signature_checker checker;
nano::outbound_bandwidth_limiter outbound_limiter;
nano::network network;
nano::telemetry telemetry;

View file

@ -1,123 +0,0 @@
#include <nano/boost/asio/post.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/node/signatures.hpp>
nano::signature_checker::signature_checker (unsigned num_threads) :
thread_pool (num_threads, nano::thread_role::name::signature_checking)
{
}
nano::signature_checker::~signature_checker ()
{
stop ();
}
void nano::signature_checker::verify (nano::signature_check_set & check_a)
{
// Don't process anything else if we have stopped
if (stopped)
{
return;
}
if (check_a.size <= batch_size || single_threaded ())
{
// Not dealing with many so just use the calling thread for checking signatures
auto result = verify_batch (check_a, 0, check_a.size);
release_assert (result);
return;
}
// Split up the tasks equally over the calling thread and the thread pool.
// Any overflow on the modulus of the batch_size is given to the calling thread, so the thread pool
// only ever operates on batch_size sizes.
std::size_t overflow_size = check_a.size % batch_size;
std::size_t num_full_batches = check_a.size / batch_size;
auto const num_threads = thread_pool.get_num_threads ();
auto total_threads_to_split_over = num_threads + 1;
auto num_base_batches_each = num_full_batches / total_threads_to_split_over;
auto num_full_overflow_batches = num_full_batches % total_threads_to_split_over;
auto size_calling_thread = (num_base_batches_each * batch_size) + overflow_size;
auto num_full_batches_thread = (num_base_batches_each * num_threads);
if (num_full_overflow_batches > 0)
{
if (overflow_size == 0)
{
// Give the calling thread priority over any batches when there is no excess remainder.
size_calling_thread += batch_size;
num_full_batches_thread += num_full_overflow_batches - 1;
}
else
{
num_full_batches_thread += num_full_overflow_batches;
}
}
release_assert (check_a.size == (num_full_batches_thread * batch_size + size_calling_thread));
std::promise<void> promise;
std::future<void> future = promise.get_future ();
// Verify a number of signature batches over the thread pool (does not block)
verify_async (check_a, num_full_batches_thread, promise);
// Verify the rest on the calling thread, this operates on the signatures at the end of the check set
auto result = verify_batch (check_a, check_a.size - size_calling_thread, size_calling_thread);
release_assert (result);
// Blocks until all the work is done
future.wait ();
}
void nano::signature_checker::stop ()
{
if (!stopped.exchange (true))
{
thread_pool.stop ();
}
}
void nano::signature_checker::flush ()
{
while (!stopped && tasks_remaining != 0)
;
}
bool nano::signature_checker::verify_batch (nano::signature_check_set const & check_a, std::size_t start_index, std::size_t size)
{
nano::validate_message_batch (check_a.messages + start_index, check_a.message_lengths + start_index, check_a.pub_keys + start_index, check_a.signatures + start_index, size, check_a.verifications + start_index);
return std::all_of (check_a.verifications + start_index, check_a.verifications + start_index + size, [] (int verification) { return verification == 0 || verification == 1; });
}
/* This operates on a number of signatures of size (num_batches * batch_size) from the beginning of the check_a pointers.
* Caller should check the value of the promise which indicates when the work has been completed.
*/
void nano::signature_checker::verify_async (nano::signature_check_set & check_a, std::size_t num_batches, std::promise<void> & promise)
{
auto task = std::make_shared<Task> (check_a, num_batches);
++tasks_remaining;
for (std::size_t batch = 0; batch < num_batches; ++batch)
{
auto size = batch_size;
auto start_index = batch * batch_size;
thread_pool.push_task ([this, task, size, start_index, &promise] {
auto result = this->verify_batch (task->check, start_index, size);
release_assert (result);
if (--task->pending == 0)
{
--tasks_remaining;
promise.set_value ();
}
});
}
}
bool nano::signature_checker::single_threaded () const
{
return thread_pool.get_num_threads () == 0;
}

View file

@ -1,62 +0,0 @@
#pragma once
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/utility.hpp>
#include <atomic>
#include <future>
namespace nano
{
class signature_check_set final
{
public:
signature_check_set (std::size_t size, unsigned char const ** messages, std::size_t * message_lengths, unsigned char const ** pub_keys, unsigned char const ** signatures, int * verifications) :
size (size), messages (messages), message_lengths (message_lengths), pub_keys (pub_keys), signatures (signatures), verifications (verifications)
{
}
std::size_t size;
unsigned char const ** messages;
std::size_t * message_lengths;
unsigned char const ** pub_keys;
unsigned char const ** signatures;
int * verifications;
};
/** Multi-threaded signature checker */
class signature_checker final
{
public:
signature_checker (unsigned num_threads);
~signature_checker ();
void verify (signature_check_set &);
void stop ();
void flush ();
static std::size_t constexpr batch_size = 256;
private:
std::atomic<int> tasks_remaining{ 0 };
std::atomic<bool> stopped{ false };
nano::thread_pool thread_pool;
struct Task final
{
Task (nano::signature_check_set & check, std::size_t pending) :
check (check), pending (pending)
{
}
~Task ()
{
release_assert (pending == 0);
}
nano::signature_check_set & check;
std::atomic<std::size_t> pending;
};
bool verify_batch (nano::signature_check_set const & check_a, std::size_t index, std::size_t size);
void verify_async (nano::signature_check_set & check_a, std::size_t num_batches, std::promise<void> & promise);
bool single_threaded () const;
};
}

View file

@ -1,164 +0,0 @@
#include <nano/lib/logger_mt.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/node/signatures.hpp>
#include <nano/node/state_block_signature_verification.hpp>
#include <nano/secure/common.hpp>
#include <boost/format.hpp>
nano::state_block_signature_verification::state_block_signature_verification (nano::signature_checker & signature_checker, nano::epochs & epochs, nano::node_config & node_config, nano::logger_mt & logger, uint64_t state_block_signature_verification_size) :
signature_checker (signature_checker),
epochs (epochs),
node_config (node_config),
logger (logger),
thread ([this, state_block_signature_verification_size] () {
nano::thread_role::set (nano::thread_role::name::state_block_signature_verification);
this->run (state_block_signature_verification_size);
})
{
}
nano::state_block_signature_verification::~state_block_signature_verification ()
{
stop ();
}
void nano::state_block_signature_verification::stop ()
{
{
nano::lock_guard<nano::mutex> guard (mutex);
stopped = true;
}
if (thread.joinable ())
{
condition.notify_one ();
thread.join ();
}
}
void nano::state_block_signature_verification::run (uint64_t state_block_signature_verification_size)
{
nano::unique_lock<nano::mutex> lk (mutex);
while (!stopped)
{
if (!state_blocks.empty ())
{
std::size_t const max_verification_batch (state_block_signature_verification_size != 0 ? state_block_signature_verification_size : nano::signature_checker::batch_size * (node_config.signature_checker_threads + 1));
active = true;
while (!state_blocks.empty () && !stopped)
{
auto items = setup_items (max_verification_batch);
lk.unlock ();
verify_state_blocks (items);
lk.lock ();
}
active = false;
lk.unlock ();
transition_inactive_callback ();
lk.lock ();
}
else
{
condition.wait (lk);
}
}
}
bool nano::state_block_signature_verification::is_active ()
{
nano::lock_guard<nano::mutex> guard (mutex);
return active;
}
void nano::state_block_signature_verification::add (value_type const & item)
{
{
nano::lock_guard<nano::mutex> guard (mutex);
state_blocks.emplace_back (item);
}
condition.notify_one ();
}
std::size_t nano::state_block_signature_verification::size ()
{
nano::lock_guard<nano::mutex> guard (mutex);
return state_blocks.size ();
}
auto nano::state_block_signature_verification::setup_items (std::size_t max_count) -> std::deque<value_type>
{
std::deque<value_type> items;
if (state_blocks.size () <= max_count)
{
items.swap (state_blocks);
}
else
{
for (auto i (0); i < max_count; ++i)
{
items.push_back (state_blocks.front ());
state_blocks.pop_front ();
}
debug_assert (!state_blocks.empty ());
}
return items;
}
void nano::state_block_signature_verification::verify_state_blocks (std::deque<value_type> & items)
{
if (!items.empty ())
{
nano::timer<> timer_l;
timer_l.start ();
auto size (items.size ());
std::vector<nano::block_hash> hashes;
hashes.reserve (size);
std::vector<unsigned char const *> messages;
messages.reserve (size);
std::vector<std::size_t> lengths;
lengths.reserve (size);
std::vector<nano::account> accounts;
accounts.reserve (size);
std::vector<unsigned char const *> pub_keys;
pub_keys.reserve (size);
std::vector<nano::signature> blocks_signatures;
blocks_signatures.reserve (size);
std::vector<unsigned char const *> signatures;
signatures.reserve (size);
std::vector<int> verifications;
verifications.resize (size, 0);
for (auto const & [block] : items)
{
hashes.push_back (block->hash ());
messages.push_back (hashes.back ().bytes.data ());
lengths.push_back (sizeof (decltype (hashes)::value_type));
nano::account account_l = block->account ();
if (!block->link ().is_zero () && epochs.is_epoch_link (block->link ()))
{
account_l = epochs.signer (epochs.epoch (block->link ()));
}
accounts.push_back (account_l);
pub_keys.push_back (accounts.back ().bytes.data ());
blocks_signatures.push_back (block->block_signature ());
signatures.push_back (blocks_signatures.back ().bytes.data ());
}
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
signature_checker.verify (check);
if (node_config.logging.timing_logging () && timer_l.stop () > std::chrono::milliseconds (10))
{
logger.try_log (boost::str (boost::format ("Batch verified %1% state blocks in %2% %3%") % size % timer_l.value ().count () % timer_l.unit ()));
}
blocks_verified_callback (items, verifications, hashes, blocks_signatures);
}
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (state_block_signature_verification & state_block_signature_verification, std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "state_blocks", state_block_signature_verification.size (), sizeof (state_block_signature_verification::value_type) }));
return composite;
}

View file

@ -1,51 +0,0 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/secure/common.hpp>
#include <deque>
#include <functional>
#include <thread>
namespace nano
{
class epochs;
class logger_mt;
class node_config;
class signature_checker;
class state_block_signature_verification
{
public:
using value_type = std::tuple<std::shared_ptr<nano::block>>;
state_block_signature_verification (nano::signature_checker &, nano::epochs &, nano::node_config &, nano::logger_mt &, uint64_t);
~state_block_signature_verification ();
void add (value_type const & item);
std::size_t size ();
void stop ();
bool is_active ();
std::function<void (std::deque<value_type> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &)> blocks_verified_callback;
std::function<void ()> transition_inactive_callback;
private:
nano::signature_checker & signature_checker;
nano::epochs & epochs;
nano::node_config & node_config;
nano::logger_mt & logger;
nano::mutex mutex{ mutex_identifier (mutexes::state_block_signature_verification) };
bool stopped{ false };
bool active{ false };
std::deque<value_type> state_blocks;
nano::condition_variable condition;
std::thread thread;
void run (uint64_t block_processor_verification_size);
std::deque<value_type> setup_items (std::size_t);
void verify_state_blocks (std::deque<value_type> &);
};
std::unique_ptr<nano::container_info_component> collect_container_info (state_block_signature_verification & state_block_signature_verification, std::string const & name);
}

View file

@ -7,7 +7,6 @@
#include <nano/node/nodeconfig.hpp>
#include <nano/node/online_reps.hpp>
#include <nano/node/repcrawler.hpp>
#include <nano/node/signatures.hpp>
#include <nano/node/vote_processor.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/ledger.hpp>
@ -17,8 +16,7 @@
#include <chrono>
using namespace std::chrono_literals;
nano::vote_processor::vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
checker (checker_a),
nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
active (active_a),
observers (observers_a),
stats (stats_a),
@ -134,36 +132,12 @@ bool nano::vote_processor::vote (std::shared_ptr<nano::vote> const & vote_a, std
void nano::vote_processor::verify_votes (decltype (votes) const & votes_a)
{
auto size (votes_a.size ());
std::vector<unsigned char const *> messages;
messages.reserve (size);
std::vector<nano::block_hash> hashes;
hashes.reserve (size);
std::vector<std::size_t> lengths (size, sizeof (nano::block_hash));
std::vector<unsigned char const *> pub_keys;
pub_keys.reserve (size);
std::vector<unsigned char const *> signatures;
signatures.reserve (size);
std::vector<int> verifications;
verifications.resize (size);
for (auto const & vote : votes_a)
{
hashes.push_back (vote.first->hash ());
messages.push_back (hashes.back ().bytes.data ());
pub_keys.push_back (vote.first->account.bytes.data ());
signatures.push_back (vote.first->signature.bytes.data ());
}
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
checker.verify (check);
auto i (0);
for (auto const & vote : votes_a)
{
debug_assert (verifications[i] == 1 || verifications[i] == 0);
if (verifications[i] == 1)
if (!nano::validate_message (vote.first->account, vote.first->hash (), vote.first->signature))
{
vote_blocking (vote.first, vote.second, true);
}
++i;
}
}

View file

@ -36,7 +36,7 @@ namespace transport
class vote_processor final
{
public:
vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
/** Returns false if the vote was processed */
bool vote (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &);
@ -56,7 +56,6 @@ public:
private:
void process_loop ();
nano::signature_checker & checker;
nano::active_transactions & active;
nano::node_observers & observers;
nano::stats & stats;

View file

@ -1739,72 +1739,6 @@ TEST (telemetry, many_nodes)
ASSERT_FALSE (all_bandwidth_limits_same);
}
// Similar to signature_checker.boundary_checks but more exhaustive. Can take up to 1 minute
TEST (signature_checker, mass_boundary_checks)
{
// sizes container must be in incrementing order
std::vector<size_t> sizes{ 0, 1 };
auto add_boundary = [&sizes] (size_t boundary) {
sizes.insert (sizes.end (), { boundary - 1, boundary, boundary + 1 });
};
for (auto i = 1; i <= 10; ++i)
{
add_boundary (nano::signature_checker::batch_size * i);
}
nano::block_builder builder;
for (auto num_threads = 0; num_threads < 5; ++num_threads)
{
nano::signature_checker checker (num_threads);
auto max_size = *(sizes.end () - 1);
std::vector<nano::uint256_union> hashes;
hashes.reserve (max_size);
std::vector<unsigned char const *> messages;
messages.reserve (max_size);
std::vector<size_t> lengths;
lengths.reserve (max_size);
std::vector<unsigned char const *> pub_keys;
pub_keys.reserve (max_size);
std::vector<unsigned char const *> signatures;
signatures.reserve (max_size);
nano::keypair key;
auto block = builder
.state ()
.account (key.pub)
.previous (0)
.representative (key.pub)
.balance (0)
.link (0)
.sign (key.prv, key.pub)
.work (0)
.build ();
size_t last_size = 0;
for (auto size : sizes)
{
// The size needed to append to existing containers, saves re-initializing from scratch each iteration
auto extra_size = size - last_size;
std::vector<int> verifications;
verifications.resize (size);
for (auto i (0); i < extra_size; ++i)
{
hashes.push_back (block->hash ());
messages.push_back (hashes.back ().bytes.data ());
lengths.push_back (sizeof (decltype (hashes)::value_type));
pub_keys.push_back (block->hashables.account.bytes.data ());
signatures.push_back (block->signature.bytes.data ());
}
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
checker.verify (check);
bool all_valid = std::all_of (verifications.cbegin (), verifications.cend (), [] (auto verification) { return verification == 1; });
ASSERT_TRUE (all_valid);
last_size = size;
}
}
}
// Test the node epoch_upgrader with a large number of accounts and threads
// Possible to manually add work peers
TEST (node, mass_epoch_upgrader)