Multithreaded --validate_blocks (#2749)

* Multitreaded --debug_validate_blocks

Block count: 33136176 (November live network snapshot)

develop branch: 8523 seconds validation
2 threads: 4587 seconds validation time
4 threads / 4 cores: 2673 seconds validation time
8 threads (Hyperthreading) / 4 cores: 1814 seconds validation time

Additionally silent command execution to show only validation status & errors count

* Alias --validate_blocks
* Apply Wesley reviews
* Apply Guilherme reviews
* Apply Wesley review (emplace_back)
This commit is contained in:
Sergey Kroshnin 2020-05-06 23:09:20 +03:00 committed by GitHub
commit f4b47d47bc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -95,19 +95,20 @@ int main (int argc, char * const * argv)
("debug_profile_frontiers_confirmation", "Profile frontiers confirmation speed (only for nano_test_network)")
("debug_random_feed", "Generates output to RNG test suites")
("debug_rpc", "Read an RPC command from stdin and invoke it. Network operations will have no effect.")
("debug_validate_blocks", "Check all blocks for correct hash, signature, work value")
("debug_peers", "Display peer IPv6:port connections")
("debug_cemented_block_count", "Displays the number of cemented (confirmed) blocks")
("debug_stacktrace", "Display an example stacktrace")
("debug_account_versions", "Display the total counts of each version for all accounts (including unpocketed)")
("validate_blocks,debug_validate_blocks", "Check all blocks for correct hash, signature, work value")
("platform", boost::program_options::value<std::string> (), "Defines the <platform> for OpenCL commands")
("device", boost::program_options::value<std::string> (), "Defines <device> for OpenCL command")
("threads", boost::program_options::value<std::string> (), "Defines <threads> count for OpenCL command")
("threads", boost::program_options::value<std::string> (), "Defines <threads> count for various commands")
("difficulty", boost::program_options::value<std::string> (), "Defines <difficulty> for OpenCL command, HEX")
("multiplier", boost::program_options::value<std::string> (), "Defines <multiplier> for work generation. Overrides <difficulty>")
("count", boost::program_options::value<std::string> (), "Defines <count> for various commands")
("pow_sleep_interval", boost::program_options::value<std::string> (), "Defines the amount to sleep inbetween each pow calculation attempt")
("address_column", boost::program_options::value<std::string> (), "Defines which column the addresses are located, 0 indexed (check --debug_output_last_backtrace_dump output)");
("address_column", boost::program_options::value<std::string> (), "Defines which column the addresses are located, 0 indexed (check --debug_output_last_backtrace_dump output)")
("silent", "Silent command execution");
// clang-format on
nano::add_node_options (description);
nano::add_node_flag_options (description);
@ -471,7 +472,7 @@ int main (int argc, char * const * argv)
catch (boost::bad_lexical_cast &)
{
std::cerr << "Invalid multiplier\n";
result = -1;
return -1;
}
}
else
@ -482,7 +483,7 @@ int main (int argc, char * const * argv)
if (nano::from_string_hex (difficulty_it->second.as<std::string> (), difficulty))
{
std::cerr << "Invalid difficulty\n";
result = -1;
return -1;
}
}
}
@ -545,7 +546,7 @@ int main (int argc, char * const * argv)
catch (boost::bad_lexical_cast &)
{
std::cerr << "Invalid platform id\n";
result = -1;
return -1;
}
}
unsigned short device (0);
@ -559,7 +560,7 @@ int main (int argc, char * const * argv)
catch (boost::bad_lexical_cast &)
{
std::cerr << "Invalid device id\n";
result = -1;
return -1;
}
}
unsigned threads (1024 * 1024);
@ -573,7 +574,7 @@ int main (int argc, char * const * argv)
catch (boost::bad_lexical_cast &)
{
std::cerr << "Invalid threads count\n";
result = -1;
return -1;
}
}
uint64_t difficulty (network_constants.publish_full.base);
@ -588,7 +589,7 @@ int main (int argc, char * const * argv)
catch (boost::bad_lexical_cast &)
{
std::cerr << "Invalid multiplier\n";
result = -1;
return -1;
}
}
else
@ -599,7 +600,7 @@ int main (int argc, char * const * argv)
if (nano::from_string_hex (difficulty_it->second.as<std::string> (), difficulty))
{
std::cerr << "Invalid difficulty\n";
result = -1;
return -1;
}
}
}
@ -1147,7 +1148,7 @@ int main (int argc, char * const * argv)
catch (boost::bad_lexical_cast &)
{
std::cerr << "Invalid count\n";
result = -1;
return -1;
}
}
std::cout << boost::str (boost::format ("Starting generating %1% blocks...\n") % (count * 2));
@ -1355,29 +1356,80 @@ int main (int argc, char * const * argv)
auto handler_l (std::make_shared<nano::json_handler> (*inactive_node_l->node, config, command_l.str (), response_handler_l));
handler_l->process_request ();
}
else if (vm.count ("debug_validate_blocks"))
else if (vm.count ("validate_blocks") || vm.count ("debug_validate_blocks"))
{
nano::timer<std::chrono::seconds> timer;
timer.start ();
auto inactive_node = nano::default_inactive_node (data_path, vm);
auto node = inactive_node->node;
auto transaction (node->store.tx_begin_read ());
std::cout << boost::str (boost::format ("Performing blocks hash, signature, work validation...\n"));
size_t count (0);
uint64_t block_count (0);
for (auto i (node->store.latest_begin (transaction)), n (node->store.latest_end ()); i != n; ++i)
bool const silent (vm.count ("silent"));
unsigned threads_count (1);
auto threads_it = vm.find ("threads");
if (threads_it != vm.end ())
{
if (!boost::conversion::try_lexical_convert (threads_it->second.as<std::string> (), threads_count))
{
std::cerr << "Invalid threads count\n";
return -1;
}
}
threads_count = std::max (1u, threads_count);
std::vector<std::thread> threads;
std::mutex mutex;
nano::condition_variable condition;
std::atomic<bool> finished (false);
std::deque<std::pair<nano::account, nano::account_info>> accounts;
std::atomic<size_t> count (0);
std::atomic<uint64_t> block_count (0);
std::atomic<uint64_t> errors (0);
auto print_error_message = [&silent, &errors](std::string const & error_message_a) {
if (!silent)
{
static std::mutex cerr_mutex;
nano::lock_guard<std::mutex> lock (cerr_mutex);
std::cerr << error_message_a;
}
++errors;
};
auto start_threads = [node, &threads_count, &threads, &mutex, &condition, &finished](const auto & function_a, auto & deque_a) {
for (auto i (0); i < threads_count; ++i)
{
threads.emplace_back ([&function_a, node, &mutex, &condition, &finished, &deque_a]() {
auto transaction (node->store.tx_begin_read ());
nano::unique_lock<std::mutex> lock (mutex);
while (!deque_a.empty () || !finished)
{
while (deque_a.empty () && !finished)
{
condition.wait (lock);
}
if (!deque_a.empty ())
{
auto pair (deque_a.front ());
deque_a.pop_front ();
lock.unlock ();
function_a (node, transaction, pair.first, pair.second);
lock.lock ();
}
}
});
}
};
auto check_account = [&print_error_message, &silent, &count, &block_count](std::shared_ptr<nano::node> const & node, nano::read_transaction const & transaction, nano::account const & account, nano::account_info const & info) {
++count;
if ((count % 20000) == 0)
if (!silent && (count % 20000) == 0)
{
std::cout << boost::str (boost::format ("%1% accounts validated\n") % count);
}
nano::account_info const & info (i->second);
nano::account const & account (i->first);
nano::confirmation_height_info confirmation_height_info;
node->store.confirmation_height_get (transaction, account, confirmation_height_info);
if (confirmation_height_info.height > info.block_count)
{
std::cerr << "Confirmation height " << confirmation_height_info.height << " greater than block count " << info.block_count << " for account: " << account.to_account () << std::endl;
print_error_message (boost::str (boost::format ("Confirmation height %1% greater than block count %2% for account: %3%\n") % confirmation_height_info.height % info.block_count % account.to_account ()));
}
auto hash (info.open_block);
@ -1395,33 +1447,33 @@ int main (int argc, char * const * argv)
{
if (block->account () != account)
{
std::cerr << boost::str (boost::format ("Incorrect account field for block %1%\n") % hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect account field for block %1%\n") % hash.to_string ()));
}
}
// Check if sideband account is correct
else if (sideband.account != account)
{
std::cerr << boost::str (boost::format ("Incorrect sideband account for block %1%\n") % hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect sideband account for block %1%\n") % hash.to_string ()));
}
// Check if previous field is correct
if (calculated_hash != block->previous ())
{
std::cerr << boost::str (boost::format ("Incorrect previous field for block %1%\n") % hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect previous field for block %1%\n") % hash.to_string ()));
}
// Check if previous & type for open blocks are correct
if (height == 0 && !block->previous ().is_zero ())
{
std::cerr << boost::str (boost::format ("Incorrect previous for open block %1%\n") % hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect previous for open block %1%\n") % hash.to_string ()));
}
if (height == 0 && block->type () != nano::block_type::open && block->type () != nano::block_type::state)
{
std::cerr << boost::str (boost::format ("Incorrect type for open block %1%\n") % hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect type for open block %1%\n") % hash.to_string ()));
}
// Check if block data is correct (calculating hash)
calculated_hash = block->hash ();
if (calculated_hash != hash)
{
std::cerr << boost::str (boost::format ("Invalid data inside block %1% calculated hash: %2%\n") % hash.to_string () % calculated_hash.to_string ());
print_error_message (boost::str (boost::format ("Invalid data inside block %1% calculated hash: %2%\n") % hash.to_string () % calculated_hash.to_string ()));
}
// Check if block signature is correct
if (validate_message (account, hash, block->block_signature ()))
@ -1443,7 +1495,7 @@ int main (int argc, char * const * argv)
}
if (invalid)
{
std::cerr << boost::str (boost::format ("Invalid signature for block %1%\n") % hash.to_string ());
print_error_message (boost::str (boost::format ("Invalid signature for block %1%\n") % hash.to_string ()));
}
}
// Validate block details set in the sideband
@ -1483,23 +1535,23 @@ int main (int argc, char * const * argv)
}
if (block_details_error)
{
std::cerr << boost::str (boost::format ("Incorrect sideband block details for block %1%\n") % hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect sideband block details for block %1%\n") % hash.to_string ()));
}
// Check if block work value is correct
if (block->difficulty () < nano::work_threshold (block->work_version (), block->sideband ().details))
{
std::cerr << boost::str (boost::format ("Invalid work for block %1% value: %2%\n") % hash.to_string () % nano::to_string_hex (block->block_work ()));
print_error_message (boost::str (boost::format ("Invalid work for block %1% value: %2%\n") % hash.to_string () % nano::to_string_hex (block->block_work ())));
}
// Check if sideband height is correct
++height;
if (sideband.height != height)
{
std::cerr << boost::str (boost::format ("Incorrect sideband height for block %1%. Sideband: %2%. Expected: %3%\n") % hash.to_string () % sideband.height % height);
print_error_message (boost::str (boost::format ("Incorrect sideband height for block %1%. Sideband: %2%. Expected: %3%\n") % hash.to_string () % sideband.height % height));
}
// Check if sideband timestamp is after previous timestamp
if (sideband.timestamp < previous_timestamp)
{
std::cerr << boost::str (boost::format ("Incorrect sideband timestamp for block %1%\n") % hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect sideband timestamp for block %1%\n") % hash.to_string ()));
}
previous_timestamp = sideband.timestamp;
// Calculate representative block
@ -1518,47 +1570,85 @@ int main (int argc, char * const * argv)
// Check if required block exists
if (!hash.is_zero () && block == nullptr)
{
std::cerr << boost::str (boost::format ("Required block in account %1% chain was not found in ledger: %2%\n") % account.to_account () % hash.to_string ());
print_error_message (boost::str (boost::format ("Required block in account %1% chain was not found in ledger: %2%\n") % account.to_account () % hash.to_string ()));
}
// Check account block count
if (info.block_count != height)
{
std::cerr << boost::str (boost::format ("Incorrect block count for account %1%. Actual: %2%. Expected: %3%\n") % account.to_account () % height % info.block_count);
print_error_message (boost::str (boost::format ("Incorrect block count for account %1%. Actual: %2%. Expected: %3%\n") % account.to_account () % height % info.block_count));
}
// Check account head block (frontier)
if (info.head != calculated_hash)
{
std::cerr << boost::str (boost::format ("Incorrect frontier for account %1%. Actual: %2%. Expected: %3%\n") % account.to_account () % calculated_hash.to_string () % info.head.to_string ());
print_error_message (boost::str (boost::format ("Incorrect frontier for account %1%. Actual: %2%. Expected: %3%\n") % account.to_account () % calculated_hash.to_string () % info.head.to_string ()));
}
// Check account representative block
if (info.representative != calculated_representative)
{
std::cerr << boost::str (boost::format ("Incorrect representative for account %1%. Actual: %2%. Expected: %3%\n") % account.to_account () % calculated_representative.to_string () % info.representative.to_string ());
print_error_message (boost::str (boost::format ("Incorrect representative for account %1%. Actual: %2%. Expected: %3%\n") % account.to_account () % calculated_representative.to_string () % info.representative.to_string ()));
}
};
start_threads (check_account, accounts);
if (!silent)
{
std::cout << boost::str (boost::format ("Performing %1% threads blocks hash, signature, work validation...\n") % threads_count);
}
std::cout << boost::str (boost::format ("%1% accounts validated\n") % count);
size_t const accounts_deque_overflow (32 * 1024);
auto transaction (node->store.tx_begin_read ());
for (auto i (node->store.latest_begin (transaction)), n (node->store.latest_end ()); i != n; ++i)
{
{
nano::unique_lock<std::mutex> lock (mutex);
if (accounts.size () > accounts_deque_overflow)
{
auto wait_ms (250 * accounts.size () / accounts_deque_overflow);
const auto wakeup (std::chrono::steady_clock::now () + std::chrono::milliseconds (wait_ms));
condition.wait_until (lock, wakeup);
}
accounts.emplace_back (i->first, i->second);
}
condition.notify_all ();
}
{
nano::lock_guard<std::mutex> lock (mutex);
finished = true;
}
condition.notify_all ();
for (auto & thread : threads)
{
thread.join ();
}
threads.clear ();
if (!silent)
{
std::cout << boost::str (boost::format ("%1% accounts validated\n") % count);
}
// Validate total block count
auto ledger_block_count (node->store.block_count (transaction).sum ());
if (block_count != ledger_block_count)
{
std::cerr << boost::str (boost::format ("Incorrect total block count. Blocks validated %1%. Block count in database: %2%\n") % block_count % ledger_block_count);
print_error_message (boost::str (boost::format ("Incorrect total block count. Blocks validated %1%. Block count in database: %2%\n") % block_count % ledger_block_count));
}
// Validate pending blocks
count = 0;
for (auto i (node->store.pending_begin (transaction)), n (node->store.pending_end ()); i != n; ++i)
{
finished = false;
std::deque<std::pair<nano::pending_key, nano::pending_info>> pending;
auto check_pending = [&print_error_message, &silent, &count](std::shared_ptr<nano::node> const & node, nano::read_transaction const & transaction, nano::pending_key const & key, nano::pending_info const & info) {
++count;
if ((count % 200000) == 0)
if (!silent && (count % 500000) == 0)
{
std::cout << boost::str (boost::format ("%1% pending blocks validated\n") % count);
}
nano::pending_key const & key (i->first);
nano::pending_info const & info (i->second);
// Check block existance
auto block (node->store.block_get_no_sideband (transaction, key.hash));
if (block == nullptr)
{
std::cerr << boost::str (boost::format ("Pending block does not exist %1%\n") % key.hash.to_string ());
print_error_message (boost::str (boost::format ("Pending block does not exist %1%\n") % key.hash.to_string ()));
}
else
{
@ -1577,27 +1667,67 @@ int main (int argc, char * const * argv)
}
else
{
std::cerr << boost::str (boost::format ("Incorrect type for pending block %1%\n") % key.hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect type for pending block %1%\n") % key.hash.to_string ()));
}
if (key.account != destination)
{
std::cerr << boost::str (boost::format ("Incorrect destination for pending block %1%\n") % key.hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect destination for pending block %1%\n") % key.hash.to_string ()));
}
// Check if pending source is correct
auto account (node->ledger.account (transaction, key.hash));
if (info.source != account)
{
std::cerr << boost::str (boost::format ("Incorrect source for pending block %1%\n") % key.hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect source for pending block %1%\n") % key.hash.to_string ()));
}
// Check if pending amount is correct
auto amount (node->ledger.amount (transaction, key.hash));
if (info.amount != amount)
{
std::cerr << boost::str (boost::format ("Incorrect amount for pending block %1%\n") % key.hash.to_string ());
print_error_message (boost::str (boost::format ("Incorrect amount for pending block %1%\n") % key.hash.to_string ()));
}
}
};
start_threads (check_pending, pending);
size_t const pending_deque_overflow (64 * 1024);
for (auto i (node->store.pending_begin (transaction)), n (node->store.pending_end ()); i != n; ++i)
{
{
nano::unique_lock<std::mutex> lock (mutex);
if (pending.size () > pending_deque_overflow)
{
auto wait_ms (50 * pending.size () / pending_deque_overflow);
const auto wakeup (std::chrono::steady_clock::now () + std::chrono::milliseconds (wait_ms));
condition.wait_until (lock, wakeup);
}
pending.emplace_back (i->first, i->second);
}
condition.notify_all ();
}
{
nano::lock_guard<std::mutex> lock (mutex);
finished = true;
}
condition.notify_all ();
for (auto & thread : threads)
{
thread.join ();
}
if (!silent)
{
std::cout << boost::str (boost::format ("%1% pending blocks validated\n") % count);
timer.stop ();
std::cout << boost::str (boost::format ("%1% %2% validation time\n") % timer.value ().count () % timer.unit ());
}
if (errors == 0)
{
std::cout << "Validation status: Ok\n";
}
else
{
std::cout << boost::str (boost::format ("Validation status: Failed\n%1% errors found\n") % errors);
}
std::cout << boost::str (boost::format ("%1% pending blocks validated\n") % count);
}
else if (vm.count ("debug_profile_bootstrap"))
{