Signature checking thread (#1411)

* Adding signature_checker class to separate out bulk processing of signatures.

* Moving signature_checker off of block_processor so it can be shared.

* Decoupling signature_checker from node.

* Fix .gitignore file so it doesn't ignore the core_test directory.

* Add signature_checker tests.

* Using signature checker class to check votes.

* Formatting.

* Removing unused function and friend declaration.
This commit is contained in:
clemahieu 2018-11-29 14:14:55 +01:00 committed by GitHub
commit 481c7aeb83
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 197 additions and 10 deletions

1
.gitignore vendored
View file

@ -28,6 +28,7 @@
*.dmg
*.DS_Store
core_test
!core_test/
qt_test
rai_node
rai_wallet

View file

@ -15,6 +15,7 @@ add_executable (core_test
processor_service.cpp
peer_container.cpp
rpc.cpp
signing.cpp
uint256_union.cpp
versioning.cpp
wallet.cpp

67
rai/core_test/signing.cpp Normal file
View file

@ -0,0 +1,67 @@
#include <gtest/gtest.h>
#include <future>
#include <rai/node/node.hpp>
TEST (signature_checker, empty)
{
rai::signature_checker checker;
std::promise<void> promise;
rai::signature_check_set check = { 0, nullptr, nullptr, nullptr, nullptr, nullptr, &promise };
checker.add (check);
promise.get_future ().wait ();
}
TEST (signature_checker, many)
{
rai::keypair key;
rai::state_block block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0);
rai::signature_checker checker;
std::promise<void> promise;
std::vector<rai::uint256_union> hashes;
std::vector<unsigned char const *> messages;
std::vector<size_t> lengths;
std::vector<unsigned char const *> pub_keys;
std::vector<unsigned char const *> signatures;
std::vector<int> verifications;
size_t size (1000);
verifications.resize (size);
for (auto i (0); i < size; ++i)
{
hashes.push_back (block.hash ());
messages.push_back (hashes.back ().bytes.data ());
lengths.push_back (sizeof (decltype (hashes)::value_type));
pub_keys.push_back (block.hashables.account.bytes.data ());
signatures.push_back (block.signature.bytes.data ());
}
rai::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise };
checker.add (check);
promise.get_future ().wait ();
}
TEST (signature_checker, one)
{
rai::keypair key;
rai::state_block block (key.pub, 0, key.pub, 0, 0, key.prv, key.pub, 0);
rai::signature_checker checker;
std::promise<void> promise;
std::vector<rai::uint256_union> hashes;
std::vector<unsigned char const *> messages;
std::vector<size_t> lengths;
std::vector<unsigned char const *> pub_keys;
std::vector<unsigned char const *> signatures;
std::vector<int> verifications;
size_t size (1);
verifications.resize (size);
for (auto i (0); i < size; ++i)
{
hashes.push_back (block.hash ());
messages.push_back (hashes.back ().bytes.data ());
lengths.push_back (sizeof (decltype (hashes)::value_type));
pub_keys.push_back (block.hashables.account.bytes.data ());
signatures.push_back (block.signature.bytes.data ());
}
rai::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise };
checker.add (check);
promise.get_future ().wait ();
}

View file

@ -55,6 +55,9 @@ namespace thread_role
case rai::thread_role::name::voting:
thread_role_name_string = "Voting";
break;
case rai::thread_role::name::signature_checking:
thread_role_name_string = "Signature check";
break;
}
/*

View file

@ -41,6 +41,7 @@ namespace thread_role
wallet_actions,
bootstrap_initiator,
voting,
signature_checking,
};
rai::thread_role::name get (void);
void set (rai::thread_role::name);

View file

@ -941,9 +941,10 @@ void rai::vote_processor::verify_votes (std::deque<std::pair<std::shared_ptr<rai
pub_keys.push_back (vote.first->account.bytes.data ());
signatures.push_back (vote.first->signature.bytes.data ());
}
/* Verifications is vector if signatures check results
validate_message_batch returing "true" if there are at least 1 invalid signature */
rai::validate_message_batch (messages.data (), lengths.data (), pub_keys.data (), signatures.data (), size, verifications.data ());
std::promise<void> promise;
rai::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise };
node.checker.add (check);
promise.get_future ().wait ();
std::remove_reference_t<decltype (votes_a)> result;
auto i (0);
for (auto & vote : votes_a)
@ -1084,6 +1085,85 @@ bool rai::rep_crawler::exists (rai::block_hash const & hash_a)
return active.count (hash_a) != 0;
}
rai::signature_checker::signature_checker () :
started (false),
stopped (false),
thread ([this]() { run (); })
{
std::unique_lock<std::mutex> lock (mutex);
while (!started)
{
condition.wait (lock);
}
}
rai::signature_checker::~signature_checker ()
{
stop ();
}
void rai::signature_checker::add (rai::signature_check_set & check_a)
{
std::lock_guard<std::mutex> lock (mutex);
checks.push_back (check_a);
condition.notify_all ();
}
void rai::signature_checker::stop ()
{
std::unique_lock<std::mutex> lock (mutex);
stopped = true;
lock.unlock ();
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}
void rai::signature_checker::flush ()
{
std::unique_lock<std::mutex> lock (mutex);
while (!stopped && !checks.empty ())
{
condition.wait (lock);
}
}
void rai::signature_checker::verify (rai::signature_check_set & check_a)
{
/* Verifications is vector if signatures check results
validate_message_batch returing "true" if there are at least 1 invalid signature */
auto code (rai::validate_message_batch (check_a.messages, check_a.message_lengths, check_a.pub_keys, check_a.signatures, check_a.size, check_a.verifications));
(void)code;
release_assert (std::all_of (check_a.verifications, check_a.verifications + check_a.size, [](int verification) { return verification == 0 || verification == 1; }));
check_a.promise->set_value ();
}
void rai::signature_checker::run ()
{
rai::thread_role::set (rai::thread_role::name::signature_checking);
std::unique_lock<std::mutex> lock (mutex);
started = true;
condition.notify_all ();
while (!stopped)
{
if (!checks.empty ())
{
auto check (checks.front ());
checks.pop_front ();
lock.unlock ();
verify (check);
lock.lock ();
condition.notify_all ();
}
else
{
condition.wait (lock);
}
}
}
rai::block_processor::block_processor (rai::node & node_a) :
stopped (false),
active (false),
@ -1108,6 +1188,7 @@ void rai::block_processor::stop ()
void rai::block_processor::flush ()
{
node.checker.flush ();
std::unique_lock<std::mutex> lock (mutex);
while (!stopped && (have_blocks () || active))
{
@ -1210,7 +1291,7 @@ void rai::block_processor::verify_state_blocks (std::unique_lock<std::mutex> & l
std::vector<unsigned char const *> signatures;
signatures.reserve (size);
std::vector<int> verifications;
verifications.resize (size);
verifications.resize (size, 0);
for (auto i (0); i < size; ++i)
{
auto & block (static_cast<rai::state_block &> (*items[i].first));
@ -1220,10 +1301,10 @@ void rai::block_processor::verify_state_blocks (std::unique_lock<std::mutex> & l
pub_keys.push_back (block.hashables.account.bytes.data ());
signatures.push_back (block.signature.bytes.data ());
}
/* Verifications is vector if signatures check results
validate_message_batch returing "true" if there are at least 1 invalid signature */
auto code (rai::validate_message_batch (messages.data (), lengths.data (), pub_keys.data (), signatures.data (), size, verifications.data ()));
(void)code;
std::promise<void> promise;
rai::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data (), &promise };
node.checker.add (check);
promise.get_future ().wait ();
lock_a.lock ();
for (auto i (0); i < size; ++i)
{
@ -1868,6 +1949,7 @@ void rai::node::stop ()
bootstrap_initiator.stop ();
bootstrap.stop ();
port_mapping.stop ();
checker.stop ();
vote_processor.stop ();
wallets.stop ();
}

View file

@ -368,6 +368,37 @@ public:
std::mutex mutex;
std::unordered_set<rai::block_hash> active;
};
class block_processor;
class signature_check_set
{
public:
size_t size;
unsigned char const ** messages;
size_t * message_lengths;
unsigned char const ** pub_keys;
unsigned char const ** signatures;
int * verifications;
std::promise<void> * promise;
};
class signature_checker
{
public:
signature_checker ();
~signature_checker ();
void add (signature_check_set &);
void stop ();
void flush ();
private:
void run ();
void verify (rai::signature_check_set & check_a);
std::deque<rai::signature_check_set> checks;
bool started;
bool stopped;
std::mutex mutex;
std::condition_variable condition;
std::thread thread;
};
// Processing blocks is a potentially long IO operation
// This class isolates block insertion from other operations like servicing network operations
class block_processor
@ -387,13 +418,13 @@ public:
private:
void queue_unchecked (rai::transaction const &, rai::block_hash const &);
void process_receive_many (std::unique_lock<std::mutex> &);
void verify_state_blocks (std::unique_lock<std::mutex> &);
void process_receive_many (std::unique_lock<std::mutex> &);
bool stopped;
bool active;
std::chrono::steady_clock::time_point next_log;
std::deque<std::pair<std::shared_ptr<rai::block>, std::chrono::steady_clock::time_point>> blocks;
std::deque<std::pair<std::shared_ptr<rai::block>, std::chrono::steady_clock::time_point>> state_blocks;
std::deque<std::pair<std::shared_ptr<rai::block>, std::chrono::steady_clock::time_point>> blocks;
std::unordered_set<rai::block_hash> blocks_hashes;
std::deque<std::shared_ptr<rai::block>> forced;
std::condition_variable condition;
@ -465,6 +496,7 @@ public:
rai::node_observers observers;
rai::wallets wallets;
rai::port_mapping port_mapping;
rai::signature_checker checker;
rai::vote_processor vote_processor;
rai::rep_crawler rep_crawler;
unsigned warmed_up;