Bounded memory and redesign in the confirmation height processor (#2531)

* Bounded memory and redesign in the confirmation height processor

* Disable frontiers confirmation for test, block doesn't exist during callback in fork resolution

* Fix rpc.confirmation_height_currently_processing test

* Cement blocks below receives not above

* Fixes gcc build (hopefully)

* Store start and end hash in pending write to remove extraneous IO

* Optimise for the case where the top hash is 2 above cemented frontier

* Fix TSAN issues

* Serg comments

* Use cached genesis_hash in CLI --confirmation_height_clear option

* Set accounts_confirmed_info_size to 0 when clearing

* Remove const for prepare_iterated_blocks_for_cementing as confusing
This commit is contained in:
Wesley Shillingford 2020-02-07 08:59:42 +00:00 committed by GitHub
commit 1e594ba364
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 993 additions and 598 deletions

View file

@ -40,14 +40,8 @@ TEST (confirmation_height, single)
node->block_processor.flush ();
system.deadline_set (10s);
while (true)
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 1)
{
auto transaction = node->store.tx_begin_read ();
if (node->ledger.block_confirmed (transaction, send1->hash ()))
{
break;
}
ASSERT_NO_ERROR (system.poll ());
}
@ -142,14 +136,8 @@ TEST (confirmation_height, multiple_accounts)
node->block_processor.flush ();
system.deadline_set (10s);
while (true)
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 10)
{
auto transaction = node->store.tx_begin_read ();
if (node->ledger.block_confirmed (transaction, receive3->hash ()))
{
break;
}
ASSERT_NO_ERROR (system.poll ());
}
@ -157,6 +145,7 @@ TEST (confirmation_height, multiple_accounts)
nano::confirmation_height_info confirmation_height_info;
auto & store = node->store;
auto transaction = node->store.tx_begin_read ();
ASSERT_TRUE (node->ledger.block_confirmed (transaction, receive3->hash ()));
ASSERT_FALSE (store.account_get (transaction, nano::test_genesis_key.pub, account_info));
ASSERT_FALSE (node->store.confirmation_height_get (transaction, nano::test_genesis_key.pub, confirmation_height_info));
ASSERT_EQ (4, confirmation_height_info.height);
@ -325,14 +314,8 @@ TEST (confirmation_height, gap_live)
node->block_processor.flush ();
system.deadline_set (10s);
while (true)
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 6)
{
auto transaction = node->store.tx_begin_read ();
if (node->ledger.block_confirmed (transaction, receive2->hash ()))
{
break;
}
ASSERT_NO_ERROR (system.poll ());
}
@ -406,18 +389,13 @@ TEST (confirmation_height, send_receive_between_2_accounts)
node->block_processor.flush ();
system.deadline_set (10s);
while (true)
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 10)
{
auto transaction = node->store.tx_begin_read ();
if (node->ledger.block_confirmed (transaction, receive4->hash ()))
{
break;
}
ASSERT_NO_ERROR (system.poll ());
}
auto transaction (node->store.tx_begin_read ());
ASSERT_TRUE (node->ledger.block_confirmed (transaction, receive4->hash ()));
nano::account_info account_info;
nano::confirmation_height_info confirmation_height_info;
ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info));
@ -474,18 +452,13 @@ TEST (confirmation_height, send_receive_self)
node->block_confirm (receive3);
system.deadline_set (10s);
while (true)
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 6)
{
auto transaction = node->store.tx_begin_read ();
if (node->ledger.block_confirmed (transaction, receive3->hash ()))
{
break;
}
ASSERT_NO_ERROR (system.poll ());
}
auto transaction (node->store.tx_begin_read ());
ASSERT_TRUE (node->ledger.block_confirmed (transaction, receive3->hash ()));
nano::account_info account_info;
ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info));
nano::confirmation_height_info confirmation_height_info;
@ -569,18 +542,13 @@ TEST (confirmation_height, all_block_types)
node->block_confirm (state_send2);
system.deadline_set (10s);
while (true)
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 15)
{
auto transaction = node->store.tx_begin_read ();
if (node->ledger.block_confirmed (transaction, state_send2->hash ()))
{
break;
}
ASSERT_NO_ERROR (system.poll ());
}
auto transaction (node->store.tx_begin_read ());
ASSERT_TRUE (node->ledger.block_confirmed (transaction, state_send2->hash ()));
nano::account_info account_info;
nano::confirmation_height_info confirmation_height_info;
ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info));
@ -688,19 +656,18 @@ TEST (confirmation_height, observers)
node1->process_active (send1);
node1->block_processor.flush ();
bool confirmed (false);
system.deadline_set (10s);
while (!confirmed)
while (node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 1)
{
auto transaction = node1->store.tx_begin_read ();
confirmed = node1->ledger.block_confirmed (transaction, send1->hash ());
ASSERT_NO_ERROR (system.poll ());
}
auto transaction = node1->store.tx_begin_read ();
ASSERT_TRUE (node1->ledger.block_confirmed (transaction, send1->hash ()));
ASSERT_EQ (1, node1->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (1, node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
}
// This tests when a read has been done and the block no longer exists by the time a write is done
// This tests when a read has been done, but the block no longer exists by the time a write is done
TEST (confirmation_height, modified_chain)
{
nano::system system;
@ -721,18 +688,23 @@ TEST (confirmation_height, modified_chain)
}
node->confirmation_height_processor.add (send->hash ());
{
// The write guard prevents the confirmation height processor doing any writes
system.deadline_set (10s);
auto write_guard = node->write_database_queue.wait (nano::writer::testing);
while (!node->write_database_queue.contains (nano::writer::confirmation_height))
;
{
ASSERT_NO_ERROR (system.poll ());
}
store.block_del (store.tx_begin_write (), send->hash (), send->type ());
}
system.deadline_set (10s);
while (node->write_database_queue.contains (nano::writer::confirmation_height))
;
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block, nano::stat::dir::in));
}
@ -763,23 +735,14 @@ TEST (confirmation_height, pending_observer_callbacks)
node->confirmation_height_processor.add (send1->hash ());
while (true)
{
if (node->pending_confirmation_height.size () == 0)
{
break;
}
}
node->confirmation_height_processor.add (send.hash ());
system.deadline_set (5s);
while (node->ledger.cache.cemented_count < 3)
system.deadline_set (10s);
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 1 || node->ledger.stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out) != 1)
{
ASSERT_NO_ERROR (system.poll ());
}
// Confirm the callback is not called under this circumstance
// Confirm the callback is not called under this circumstance because there is no election information
ASSERT_EQ (2, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (0, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
}
TEST (confirmation_height, prioritize_frontiers)
@ -1039,6 +1002,18 @@ TEST (confirmation_height, callback_confirmed_history)
auto transaction = node->store.tx_begin_read ();
ASSERT_TRUE (node->ledger.block_confirmed (transaction, send->hash ()));
system.deadline_set (10s);
while (node->active.size () > 0)
{
ASSERT_NO_ERROR (system.poll ());
}
system.deadline_set (10s);
while (node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out) != 1)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node->active.list_confirmed ().size ());
ASSERT_EQ (0, node->active.blocks.size ());
@ -1048,8 +1023,7 @@ TEST (confirmation_height, callback_confirmed_history)
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
nano::lock_guard<std::mutex> guard (node->active.mutex);
ASSERT_EQ (0, node->active.pending_conf_height.size ());
ASSERT_EQ (0, node->active.election_winner_details_size ());
}
namespace nano
@ -1089,14 +1063,14 @@ TEST (confirmation_height, dependent_election)
}
system.deadline_set (10s);
while (node->pending_confirmation_height.size () != 1)
while (node->confirmation_height_processor.awaiting_processing_size () != 1)
{
ASSERT_NO_ERROR (system.poll ());
}
{
nano::lock_guard<std::mutex> guard (node->pending_confirmation_height.mutex);
ASSERT_EQ (*node->pending_confirmation_height.pending.begin (), send2->hash ());
nano::lock_guard<std::mutex> guard (node->confirmation_height_processor.mutex);
ASSERT_EQ (*node->confirmation_height_processor.awaiting_processing.begin (), send2->hash ());
}
// Now put the other block in active so it can be confirmed as a dependent election
@ -1104,20 +1078,166 @@ TEST (confirmation_height, dependent_election)
node->confirmation_height_processor.unpause ();
system.deadline_set (10s);
while (node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out) != 1 && node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out) != 1)
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 3)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (3, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
nano::lock_guard<std::mutex> guard (node->active.mutex);
ASSERT_EQ (0, node->active.pending_conf_height.size ());
ASSERT_EQ (0, node->active.election_winner_details_size ());
}
TEST (confirmation_height, dependent_election_after_already_cemented)
// This test checks that a receive block with uncemented blocks below cements them too.
TEST (confirmation_height, cemented_gap_below_receive)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node = system.add_node (node_config);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
nano::block_hash latest (node->latest (nano::test_genesis_key.pub));
nano::keypair key1;
system.wallet (0)->insert_adhoc (key1.prv);
nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest));
nano::send_block send1 (send.hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send.hash ()));
nano::keypair dummy_key;
nano::send_block dummy_send (send1.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1.hash ()));
nano::open_block open (send.hash (), nano::genesis_account, key1.pub, key1.prv, key1.pub, *system.work.generate (key1.pub));
nano::receive_block receive1 (open.hash (), send1.hash (), key1.prv, key1.pub, *system.work.generate (open.hash ()));
nano::send_block send2 (receive1.hash (), nano::test_genesis_key.pub, nano::Gxrb_ratio, key1.prv, key1.pub, *system.work.generate (receive1.hash ()));
nano::receive_block receive2 (dummy_send.hash (), send2.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (dummy_send.hash ()));
nano::send_block dummy_send1 (receive2.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (receive2.hash ()));
nano::keypair key2;
system.wallet (0)->insert_adhoc (key2.prv);
nano::send_block send3 (dummy_send1.hash (), key2.pub, nano::genesis_amount - nano::Gxrb_ratio * 4, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (dummy_send1.hash ()));
nano::send_block dummy_send2 (send3.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 5, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send3.hash ()));
auto open1 = std::make_shared<nano::open_block> (send3.hash (), nano::genesis_account, key2.pub, key2.prv, key2.pub, *system.work.generate (key2.pub));
{
auto transaction = node->store.tx_begin_write ();
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive2).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send3).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send2).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *open1).code);
}
add_callback_stats (*node);
node->block_confirm (open1);
system.deadline_set (10s);
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 10)
{
ASSERT_NO_ERROR (system.poll ());
}
auto transaction = node->store.tx_begin_read ();
ASSERT_TRUE (node->ledger.block_confirmed (transaction, open1->hash ()));
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
ASSERT_EQ (0, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out));
ASSERT_EQ (9, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
}
// This test checks that a receive block with uncemented blocks below cements them too, compared with the test above, this
// is the first write in this chain.
TEST (confirmation_height, cemented_gap_below_no_cache)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node = system.add_node (node_config);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
nano::block_hash latest (node->latest (nano::test_genesis_key.pub));
nano::keypair key1;
system.wallet (0)->insert_adhoc (key1.prv);
nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest));
nano::send_block send1 (send.hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send.hash ()));
nano::keypair dummy_key;
nano::send_block dummy_send (send1.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1.hash ()));
nano::open_block open (send.hash (), nano::genesis_account, key1.pub, key1.prv, key1.pub, *system.work.generate (key1.pub));
nano::receive_block receive1 (open.hash (), send1.hash (), key1.prv, key1.pub, *system.work.generate (open.hash ()));
nano::send_block send2 (receive1.hash (), nano::test_genesis_key.pub, nano::Gxrb_ratio, key1.prv, key1.pub, *system.work.generate (receive1.hash ()));
nano::receive_block receive2 (dummy_send.hash (), send2.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (dummy_send.hash ()));
nano::send_block dummy_send1 (receive2.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (receive2.hash ()));
nano::keypair key2;
system.wallet (0)->insert_adhoc (key2.prv);
nano::send_block send3 (dummy_send1.hash (), key2.pub, nano::genesis_amount - nano::Gxrb_ratio * 4, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (dummy_send1.hash ()));
nano::send_block dummy_send2 (send3.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 5, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send3.hash ()));
auto open1 = std::make_shared<nano::open_block> (send3.hash (), nano::genesis_account, key2.pub, key2.prv, key2.pub, *system.work.generate (key2.pub));
{
auto transaction = node->store.tx_begin_write ();
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive2).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send3).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send2).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *open1).code);
}
// Force some blocks to be cemented so that the cached confirmed info variable is empty
{
auto transaction (node->store.tx_begin_write ());
node->store.confirmation_height_put (transaction, nano::genesis_account, nano::confirmation_height_info{ 3, send1.hash () });
node->store.confirmation_height_put (transaction, key1.pub, nano::confirmation_height_info{ 2, receive1.hash () });
}
add_callback_stats (*node);
node->block_confirm (open1);
system.deadline_set (10s);
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 6)
{
ASSERT_NO_ERROR (system.poll ());
}
auto transaction = node->store.tx_begin_read ();
ASSERT_TRUE (node->ledger.block_confirmed (transaction, open1->hash ()));
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
ASSERT_EQ (0, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out));
ASSERT_EQ (5, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
}
TEST (confirmation_height, election_winner_details_clearing)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
@ -1130,60 +1250,65 @@ TEST (confirmation_height, dependent_election_after_already_cemented)
nano::keypair key1;
auto send = std::make_shared<nano::send_block> (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest));
auto send1 = std::make_shared<nano::send_block> (send->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send->hash ()));
nano::send_block send2 (send1->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ()));
{
auto transaction = node->store.tx_begin_write ();
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code);
}
add_callback_stats (*node);
{
node->block_confirm (send1);
auto write_guard = node->write_database_queue.wait (nano::writer::testing);
system.deadline_set (10s);
while (node->active.size () > 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (0, node->active.list_confirmed ().size ());
{
nano::lock_guard<std::mutex> guard (node->active.mutex);
ASSERT_EQ (0, node->active.blocks.size ());
}
auto transaction = node->store.tx_begin_read ();
ASSERT_FALSE (node->ledger.block_confirmed (transaction, send->hash ()));
system.deadline_set (10s);
while (!node->write_database_queue.contains (nano::writer::confirmation_height))
{
ASSERT_NO_ERROR (system.poll ());
}
node->block_confirm (send);
system.deadline_set (10s);
while (node->active.size () > 0)
{
ASSERT_NO_ERROR (system.poll ());
}
}
node->block_confirm (send1);
system.deadline_set (10s);
while (node->pending_confirmation_height.size () != 0)
while (node->active.size () > 0)
{
ASSERT_NO_ERROR (system.poll ());
}
system.deadline_set (10s);
nano::unique_lock<std::mutex> lk (node->active.mutex);
while (node->active.pending_conf_height.size () > 0)
ASSERT_EQ (0, node->active.list_confirmed ().size ());
{
lk.unlock ();
ASSERT_NO_ERROR (system.poll ());
lk.lock ();
nano::lock_guard<std::mutex> guard (node->active.mutex);
ASSERT_EQ (0, node->active.blocks.size ());
}
system.deadline_set (10s);
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 2)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (0, node->active.election_winner_details_size ());
node->block_confirm (send);
system.deadline_set (10s);
while (node->active.size () > 0)
{
ASSERT_NO_ERROR (system.poll ());
}
// Wait until this block is confirmed
system.deadline_set (10s);
while (node->active.election_winner_details_size () != 1 && !node->confirmation_height_processor.current ().is_zero ())
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
// election_winner_details should get cleared during another batch of cementing, so add another block
node->confirmation_height_processor.add (send2.hash ());
system.deadline_set (10s);
while (node->active.election_winner_details_size () > 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
ASSERT_EQ (2, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
}
}

View file

@ -1720,10 +1720,14 @@ TEST (node, broadcast_elected)
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (3, type, node_flags);
auto node0 (system.nodes[0]);
auto node1 (system.nodes[1]);
auto node2 (system.nodes[2]);
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node0 = system.add_node (node_config, node_flags, type);
node_config.peering_port = nano::get_available_port ();
auto node1 = system.add_node (node_config, node_flags, type);
node_config.peering_port = nano::get_available_port ();
auto node2 = system.add_node (node_config, node_flags, type);
nano::keypair rep_big;
nano::keypair rep_small;
nano::keypair rep_other;

View file

@ -1,5 +1,6 @@
#include <nano/lib/threading.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/confirmation_height_processor.hpp>
#include <nano/node/election.hpp>
#include <nano/node/node.hpp>
@ -10,7 +11,8 @@
using namespace std::chrono;
nano::active_transactions::active_transactions (nano::node & node_a) :
nano::active_transactions::active_transactions (nano::node & node_a, nano::confirmation_height_processor & confirmation_height_processor_a) :
confirmation_height_processor (confirmation_height_processor_a),
node (node_a),
multipliers_cb (20, 1.),
trended_active_difficulty (node_a.network_params.network.publish_threshold),
@ -27,6 +29,16 @@ thread ([this]() {
request_loop ();
})
{
// Register a callback which will get called after a block is cemented
confirmation_height_processor.add_cemented_observer ([this](nano::confirmation_height_processor::callback_data const & callback_data) {
this->block_cemented_callback (callback_data.block, callback_data.sideband);
});
// Register a callback which will get called after a batch of blocks is written and observer calls finished
confirmation_height_processor.add_cemented_batch_finished_observer ([this]() {
this->cemented_batch_finished_callback ();
});
assert (min_time_between_requests > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
assert (min_time_between_floods > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
nano::unique_lock<std::mutex> lock (mutex);
@ -90,7 +102,7 @@ void nano::active_transactions::search_frontiers (nano::transaction const & tran
error = node.store.confirmation_height_get (transaction_a, cementable_account.account, confirmation_height_info);
release_assert (!error);
if (info.block_count > confirmation_height_info.height && !this->node.pending_confirmation_height.is_processing_block (info.head))
if (info.block_count > confirmation_height_info.height && !this->confirmation_height_processor.is_processing_block (info.head))
{
auto block (this->node.store.block_get (transaction_a, info.head));
if (!this->start (block, true))
@ -112,49 +124,87 @@ void nano::active_transactions::search_frontiers (nano::transaction const & tran
next_frontier_check = steady_clock::now () + (agressive_factor / test_network_factor);
}
}
void nano::active_transactions::post_confirmation_height_set (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a)
void nano::active_transactions::block_cemented_callback (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a)
{
if (election_status_type_a == nano::election_status_type::inactive_confirmation_height)
auto transaction = node.store.tx_begin_read ();
boost::optional<nano::election_status_type> election_status_type;
if (!confirmation_height_processor.is_processing_block (block_a->hash ()))
{
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction_a, block_a, block_a->hash (), sideband_a, account, amount, is_state_send, pending_account);
node.observers.blocks.notify (nano::election_status{ block_a, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), 0, 1, 0, nano::election_status_type::inactive_confirmation_height }, account, amount, is_state_send);
election_status_type = confirm_block (transaction, block_a);
}
else
{
auto hash (block_a->hash ());
nano::lock_guard<std::mutex> lock (mutex);
auto existing (pending_conf_height.find (hash));
if (existing != pending_conf_height.end ())
{
auto election = existing->second;
if (election->confirmed && !election->stopped && election->status.winner->hash () == hash)
{
add_confirmed (existing->second->status, block_a->qualified_root ());
// This block was explicitly added to the confirmation height_processor
election_status_type = nano::election_status_type::active_confirmed_quorum;
}
node.receive_confirmed (transaction_a, block_a, hash);
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction_a, block_a, hash, sideband_a, account, amount, is_state_send, pending_account);
election->status.type = election_status_type_a;
election->status.confirmation_request_count = election->confirmation_request_count;
node.observers.blocks.notify (election->status, account, amount, is_state_send);
if (amount > 0)
if (election_status_type.is_initialized ())
{
if (election_status_type == nano::election_status_type::inactive_confirmation_height)
{
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction, block_a, block_a->hash (), sideband_a, account, amount, is_state_send, pending_account);
node.observers.blocks.notify (nano::election_status{ block_a, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), 0, 1, 0, nano::election_status_type::inactive_confirmation_height }, account, amount, is_state_send);
}
else
{
auto hash (block_a->hash ());
nano::lock_guard<std::mutex> lock (mutex);
auto existing (election_winner_details.find (hash));
if (existing != election_winner_details.end ())
{
auto election = existing->second;
if (election->confirmed && !election->stopped && election->status.winner->hash () == hash)
{
node.observers.account_balance.notify (account, false);
if (!pending_account.is_zero ())
add_confirmed (existing->second->status, block_a->qualified_root ());
node.receive_confirmed (transaction, block_a, hash);
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction, block_a, hash, sideband_a, account, amount, is_state_send, pending_account);
election->status.type = *election_status_type;
election->status.confirmation_request_count = election->confirmation_request_count;
node.observers.blocks.notify (election->status, account, amount, is_state_send);
if (amount > 0)
{
node.observers.account_balance.notify (pending_account, true);
node.observers.account_balance.notify (account, false);
if (!pending_account.is_zero ())
{
node.observers.account_balance.notify (pending_account, true);
}
}
}
}
pending_conf_height.erase (hash);
election_winner_details.erase (hash);
}
}
}
}
void nano::active_transactions::cemented_batch_finished_callback ()
{
// Depending on timing there is a situation where the election_winner_details is not reset.
// This can happen when a block wins an election, and the block is confirmed + observer
// called before the block hash gets added to election_winner_details. If the block is confirmed
// callbacks have already been done, so we can safely just remove it.
auto transaction = node.store.tx_begin_read ();
nano::lock_guard<std::mutex> guard (mutex);
for (auto it = election_winner_details.begin (); it != election_winner_details.end ();)
{
if (node.ledger.block_confirmed (transaction, it->first))
{
it = election_winner_details.erase (it);
}
else
{
++it;
}
}
}
@ -220,7 +270,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
// Due to the confirmation height processor working asynchronously and compressing several roots into one frontier, probably_unconfirmed_frontiers can be wrong
{
auto pending_confirmation_height_size (node.pending_confirmation_height.size ());
auto pending_confirmation_height_size (confirmation_height_processor.awaiting_processing_size ());
bool probably_unconfirmed_frontiers (node.ledger.cache.block_count > node.ledger.cache.cemented_count + roots.size () + pending_confirmation_height_size);
bool bootstrap_weight_reached (node.ledger.cache.block_count >= node.ledger.bootstrap_weight_max_blocks);
if (node.config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled && bootstrap_weight_reached && probably_unconfirmed_frontiers && pending_confirmation_height_size < confirmed_frontiers_max_pending_cut_off)
@ -344,7 +394,7 @@ void nano::active_transactions::request_loop ()
void nano::active_transactions::prioritize_account_for_confirmation (nano::active_transactions::prioritize_num_uncemented & cementable_frontiers_a, size_t & cementable_frontiers_size_a, nano::account const & account_a, nano::account_info const & info_a, uint64_t confirmation_height)
{
if (info_a.block_count > confirmation_height && !node.pending_confirmation_height.is_processing_block (info_a.head))
if (info_a.block_count > confirmation_height && !confirmation_height_processor.is_processing_block (info_a.head))
{
auto num_uncemented = info_a.block_count - confirmation_height;
nano::lock_guard<std::mutex> guard (mutex);
@ -386,7 +436,7 @@ void nano::active_transactions::prioritize_account_for_confirmation (nano::activ
void nano::active_transactions::prioritize_frontiers_for_confirmation (nano::transaction const & transaction_a, std::chrono::milliseconds ledger_accounts_time_a, std::chrono::milliseconds wallet_account_time_a)
{
// Don't try to prioritize when there are a large number of pending confirmation heights as blocks can be cemented in the meantime, making the prioritization less reliable
if (node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off)
if (confirmation_height_processor.awaiting_processing_size () < confirmed_frontiers_max_pending_cut_off)
{
size_t priority_cementable_frontiers_size;
size_t priority_wallet_cementable_frontiers_size;
@ -884,12 +934,6 @@ bool nano::active_transactions::publish (std::shared_ptr<nano::block> block_a)
return result;
}
void nano::active_transactions::clear_block (nano::block_hash const & hash_a)
{
nano::lock_guard<std::mutex> guard (mutex);
pending_conf_height.erase (hash_a);
}
// Returns the type of election status requiring callbacks calling later
boost::optional<nano::election_status_type> nano::active_transactions::confirm_block (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a)
{
@ -1030,6 +1074,12 @@ std::chrono::steady_clock::time_point nano::active_transactions::find_dropped_el
}
}
size_t nano::active_transactions::election_winner_details_size ()
{
nano::lock_guard<std::mutex> guard (mutex);
return election_winner_details.size ();
}
nano::cementable_account::cementable_account (nano::account const & account_a, size_t blocks_uncemented_a) :
account (account_a), blocks_uncemented (blocks_uncemented_a)
{
@ -1040,20 +1090,18 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (ac
size_t roots_count;
size_t blocks_count;
size_t confirmed_count;
size_t pending_conf_height_count;
{
nano::lock_guard<std::mutex> guard (active_transactions.mutex);
roots_count = active_transactions.roots.size ();
blocks_count = active_transactions.blocks.size ();
confirmed_count = active_transactions.confirmed.size ();
pending_conf_height_count = active_transactions.pending_conf_height.size ();
}
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pending_conf_height", pending_conf_height_count, sizeof (decltype (active_transactions.pending_conf_height)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "election_winner_details", active_transactions.election_winner_details_size (), sizeof (decltype (active_transactions.election_winner_details)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "confirmed", confirmed_count, sizeof (decltype (active_transactions.confirmed)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priority_wallet_cementable_frontiers_count", active_transactions.priority_wallet_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priority_cementable_frontiers_count", active_transactions.priority_cementable_frontiers_size (), sizeof (nano::cementable_account) }));

View file

@ -34,6 +34,7 @@ class block_sideband;
class election;
class vote;
class transaction;
class confirmation_height_processor;
class conflict_info final
{
@ -72,7 +73,7 @@ class active_transactions final
// clang-format on
public:
explicit active_transactions (nano::node &);
explicit active_transactions (nano::node &, nano::confirmation_height_processor &);
~active_transactions ();
// Start an election for a block
// Call action with confirmed block, may be different than what we started with
@ -96,7 +97,8 @@ public:
void stop ();
bool publish (std::shared_ptr<nano::block> block_a);
boost::optional<nano::election_status_type> confirm_block (nano::transaction const &, std::shared_ptr<nano::block>);
void post_confirmation_height_set (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a);
void block_cemented_callback (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a);
void cemented_batch_finished_callback ();
// clang-format off
boost::multi_index_container<nano::conflict_info,
mi::indexed_by<
@ -114,6 +116,7 @@ public:
void add_inactive_votes_cache (nano::block_hash const &, nano::account const &);
nano::gap_information find_inactive_votes_cache (nano::block_hash const &);
void erase_inactive_votes_cache (nano::block_hash const &);
nano::confirmation_height_processor & confirmation_height_processor;
nano::node & node;
std::mutex mutex;
boost::circular_buffer<double> multipliers_cb;
@ -122,8 +125,8 @@ public:
size_t priority_wallet_cementable_frontiers_size ();
boost::circular_buffer<double> difficulty_trend ();
size_t inactive_votes_cache_size ();
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> pending_conf_height;
void clear_block (nano::block_hash const & hash_a);
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> election_winner_details;
size_t election_winner_details_size ();
void add_dropped_elections_cache (nano::qualified_root const &);
std::chrono::steady_clock::time_point find_dropped_elections_cache (nano::qualified_root const &);
size_t dropped_elections_cache_size ();

View file

@ -1172,7 +1172,7 @@ void reset_confirmation_heights (nano::block_store & store)
// Then make sure the confirmation height of the genesis account open block is 1
nano::network_params network_params;
store.confirmation_height_put (transaction, network_params.ledger.genesis_account, { 1, network_params.ledger.genesis_block });
store.confirmation_height_put (transaction, network_params.ledger.genesis_account, { 1, network_params.ledger.genesis_hash });
}
bool is_using_rocksdb (boost::filesystem::path const & data_path, std::error_code & ec)

View file

@ -3,7 +3,6 @@
#include <nano/lib/stats.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/confirmation_height_processor.hpp>
#include <nano/node/write_database_queue.hpp>
#include <nano/secure/blockstore.hpp>
@ -15,10 +14,8 @@
#include <cassert>
#include <numeric>
nano::confirmation_height_processor::confirmation_height_processor (nano::pending_confirmation_height & pending_confirmation_height_a, nano::ledger & ledger_a, nano::active_transactions & active_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a) :
pending_confirmations (pending_confirmation_height_a),
nano::confirmation_height_processor::confirmation_height_processor (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a) :
ledger (ledger_a),
active (active_a),
logger (logger_a),
write_database_queue (write_database_queue_a),
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
@ -46,44 +43,46 @@ void nano::confirmation_height_processor::stop ()
void nano::confirmation_height_processor::run ()
{
nano::unique_lock<std::mutex> lk (pending_confirmations.mutex);
nano::unique_lock<std::mutex> lk (mutex);
while (!stopped)
{
if (!paused && !pending_confirmations.pending.empty ())
if (!paused && !awaiting_processing.empty ())
{
pending_confirmations.current_hash = *pending_confirmations.pending.begin ();
pending_confirmations.pending.erase (pending_confirmations.current_hash);
// Copy the hash so can be used outside owning the lock
auto current_pending_block = pending_confirmations.current_hash;
lk.unlock ();
if (pending_writes.empty ())
{
// Separate blocks which are pending confirmation height can be batched by a minimum processing time (to improve disk write performance), so make sure the slate is clean when a new batch is starting.
confirmed_iterated_pairs.clear ();
// Separate blocks which are pending confirmation height can be batched by a minimum processing time (to improve lmdb disk write performance),
// so make sure the slate is clean when a new batch is starting.
accounts_confirmed_info.clear ();
accounts_confirmed_info_size = 0;
timer.restart ();
}
add_confirmation_height (current_pending_block);
set_next_hash ();
process ();
lk.lock ();
pending_confirmations.current_hash = 0;
}
else
{
// If there are no blocks pending confirmation, then make sure we flush out the remaining writes
// If there are blocks pending cementing, then make sure we flush out the remaining writes
lk.unlock ();
if (!pending_writes.empty ())
{
lk.unlock ();
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
write_pending (pending_writes);
cement_blocks ();
lk.lock ();
original_hash.clear ();
}
else
{
lk.lock ();
original_hash.clear ();
condition.wait (lk);
}
}
}
}
// Pausing only affects processing new blocks, not the current one being processed. Currently only used in tests
void nano::confirmation_height_processor::pause ()
{
paused = true;
@ -98,95 +97,139 @@ void nano::confirmation_height_processor::unpause ()
void nano::confirmation_height_processor::add (nano::block_hash const & hash_a)
{
{
nano::lock_guard<std::mutex> lk (pending_confirmations.mutex);
pending_confirmations.pending.insert (hash_a);
nano::lock_guard<std::mutex> lk (mutex);
awaiting_processing.insert (hash_a);
}
condition.notify_one ();
}
/**
* For all the blocks below this height which have been implicitly confirmed check if they
* are open/receive blocks, and if so follow the source blocks and iteratively repeat to genesis.
* To limit write locking and to keep the confirmation height ledger correctly synced, confirmations are
* written from the ground upwards in batches.
*/
void nano::confirmation_height_processor::add_confirmation_height (nano::block_hash const & hash_a)
// The next block hash to iterate over, the priority is as follows:
// 1 - The next block in the account chain for the last processed receive (if there is any)
// 2 - The next receive block which is closest to genesis
// 3 - The last checkpoint hit.
// 4 - The hash that was passed in originally. Either all checkpoints were exhausted (this can happen when there are many accounts to genesis)
// or all other blocks have been processed.
nano::confirmation_height_processor::top_and_next_hash nano::confirmation_height_processor::get_next_block (boost::optional<top_and_next_hash> const & next_in_receive_chain_a, boost::circular_buffer_space_optimized<nano::block_hash> const & checkpoints_a, boost::circular_buffer_space_optimized<receive_source_pair> const & receive_source_pairs, boost::optional<receive_chain_details> & receive_details_a)
{
boost::optional<conf_height_details> receive_details;
auto current = hash_a;
assert (receive_source_pairs_size == 0);
release_assert (receive_source_pairs.empty ());
top_and_next_hash next;
if (next_in_receive_chain_a.is_initialized ())
{
next = *next_in_receive_chain_a;
}
else if (!receive_source_pairs.empty ())
{
auto next_receive_source_pair = receive_source_pairs.back ();
receive_details_a = next_receive_source_pair.receive_details;
next = { next_receive_source_pair.source_hash, receive_details_a->next, receive_details_a->height + 1 };
}
else if (!checkpoints_a.empty ())
{
next = { checkpoints_a.back (), boost::none, 0 };
}
else
{
next = { original_hash, boost::none, 0 };
}
auto read_transaction (ledger.store.tx_begin_read ());
auto last_iteration = false;
// Traverse account chain and all sources for receive blocks iteratively
return next;
}
nano::block_hash nano::confirmation_height_processor::get_least_unconfirmed_hash_from_top_level (nano::transaction const & transaction_a, nano::block_hash const & hash_a, nano::account const & account_a, nano::confirmation_height_info const & confirmation_height_info_a, uint64_t & block_height_a)
{
nano::block_hash least_unconfirmed_hash = hash_a;
nano::block_sideband sideband;
if (confirmation_height_info_a.height != 0)
{
if (block_height_a > confirmation_height_info_a.height)
{
release_assert (ledger.store.block_get (transaction_a, confirmation_height_info_a.frontier, &sideband) != nullptr);
least_unconfirmed_hash = sideband.successor;
block_height_a = sideband.height + 1;
}
}
else
{
// No blocks have been confirmed, so the first block will be the open block
nano::account_info account_info;
release_assert (!ledger.store.account_get (transaction_a, account_a, account_info));
least_unconfirmed_hash = account_info.open_block;
block_height_a = 1;
}
return least_unconfirmed_hash;
}
void nano::confirmation_height_processor::set_next_hash ()
{
nano::lock_guard<std::mutex> guard (mutex);
assert (!awaiting_processing.empty ());
original_hash = *awaiting_processing.begin ();
original_hashes_pending.insert (original_hash);
awaiting_processing.erase (original_hash);
}
void nano::confirmation_height_processor::process ()
{
nano::block_sideband sideband;
nano::confirmation_height_info confirmation_height_info;
auto transaction (ledger.store.tx_begin_read ());
boost::optional<top_and_next_hash> next_in_receive_chain;
boost::circular_buffer_space_optimized<nano::block_hash> checkpoints{ max_items };
boost::circular_buffer_space_optimized<receive_source_pair> receive_source_pairs{ max_items };
nano::block_hash current;
do
{
if (!receive_source_pairs.empty ())
boost::optional<receive_chain_details> receive_details;
auto hash_to_process = get_next_block (next_in_receive_chain, checkpoints, receive_source_pairs, receive_details);
current = hash_to_process.top;
auto top_level_hash = current;
nano::account account (ledger.store.block_account (transaction, current));
release_assert (!ledger.store.confirmation_height_get (transaction, account, confirmation_height_info));
// Checks if we have encountered this account before but not commited changes yet, if so then update the cached confirmation height
auto account_it = accounts_confirmed_info.find (account);
if (account_it != accounts_confirmed_info.cend () && account_it->second.confirmed_height > confirmation_height_info.height)
{
receive_details = receive_source_pairs.back ().receive_details;
current = receive_source_pairs.back ().source_hash;
confirmation_height_info.height = account_it->second.confirmed_height;
confirmation_height_info.frontier = account_it->second.iterated_frontier;
}
else
nano::block_sideband sideband;
auto block = ledger.store.block_get (transaction, current, &sideband);
assert (block != nullptr);
auto block_height = sideband.height;
bool already_cemented = confirmation_height_info.height >= block_height;
// If we are not already at the bottom of the account chain (1 above cemented frontier) then find it
if (!already_cemented && block_height - confirmation_height_info.height > 1)
{
// If receive_details is set then this is the final iteration and we are back to the original chain.
// We need to confirm any blocks below the original hash (incl self) and the first receive block
// (if the original block is not already a receive)
if (receive_details)
if (block_height - confirmation_height_info.height == 2)
{
current = hash_a;
receive_details = boost::none;
last_iteration = true;
// If there is 1 uncemented block in-between this block and the cemented frontier,
// we can just use the previous block to get the least unconfirmed hash.
current = block->previous ();
--block_height;
}
else if (!next_in_receive_chain.is_initialized ())
{
current = get_least_unconfirmed_hash_from_top_level (transaction, current, account, confirmation_height_info, block_height);
}
else
{
// Use the cached successor of the last receive which saves having to do more IO in get_least_unconfirmed_hash_from_top_level
// as we already know what the next block we should process should be.
current = *hash_to_process.next;
block_height = hash_to_process.next_height;
}
}
auto block_height (ledger.store.block_account_height (read_transaction, current));
nano::account account (ledger.store.block_account (read_transaction, current));
nano::confirmation_height_info confirmation_height_info;
release_assert (!ledger.store.confirmation_height_get (read_transaction, account, confirmation_height_info));
auto confirmation_height = confirmation_height_info.height;
auto iterated_height = confirmation_height;
auto account_it = confirmed_iterated_pairs.find (account);
if (account_it != confirmed_iterated_pairs.cend ())
auto top_most_non_receive_block_hash = current;
bool hit_receive = false;
if (!already_cemented)
{
if (account_it->second.confirmed_height > confirmation_height)
{
confirmation_height = account_it->second.confirmed_height;
iterated_height = confirmation_height;
}
if (account_it->second.iterated_height > iterated_height)
{
iterated_height = account_it->second.iterated_height;
}
}
if (!last_iteration && current == hash_a && confirmation_height >= block_height)
{
auto it = std::find_if (pending_writes.begin (), pending_writes.end (), [&hash_a](auto & conf_height_details) {
auto it = std::find_if (conf_height_details.block_callbacks_required.begin (), conf_height_details.block_callbacks_required.end (), [&hash_a](auto & callback_data) {
return callback_data.block->hash () == hash_a;
});
return (it != conf_height_details.block_callbacks_required.end ());
});
if (it == pending_writes.end ())
{
// This is a block which has been added to the processor but already has its confirmation height set (or about to be set)
// Just need to perform active cleanup, no callbacks are needed.
active.clear_block (hash_a);
}
}
auto count_before_receive = receive_source_pairs.size ();
std::vector<callback_data> block_callbacks_required;
if (block_height > iterated_height)
{
if ((block_height - iterated_height) > 20000)
{
logger.always_log ("Iterating over a large account chain for setting confirmation height. The top block: ", current.to_string ());
}
collect_unconfirmed_receive_and_sources_for_account (block_height, iterated_height, current, account, read_transaction, block_callbacks_required);
hit_receive = iterate (transaction, block_height, current, checkpoints, top_most_non_receive_block_hash, top_level_hash, receive_source_pairs, account);
}
// Exit early when the processor has been stopped, otherwise this function may take a
@ -196,307 +239,407 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h
break;
}
// No longer need the read transaction
read_transaction.reset ();
// next_in_receive_chain can be modified when writing, so need to cache it here before resetting
auto is_set = next_in_receive_chain.is_initialized ();
next_in_receive_chain = boost::none;
// If this adds no more open or receive blocks, then we can now confirm this account as well as the linked open/receive block
// Collect as pending any writes to the database and do them in bulk after a certain time.
auto confirmed_receives_pending = (count_before_receive != receive_source_pairs.size ());
if (!confirmed_receives_pending)
// Need to also handle the case where we are hitting receives where the sends below should be confirmed
if (!hit_receive || (receive_source_pairs.size () == 1 && top_most_non_receive_block_hash != current))
{
if (block_height > confirmation_height)
preparation_data preparation_data{ transaction, top_most_non_receive_block_hash, already_cemented, checkpoints, account_it, confirmation_height_info, account, block_height, current, receive_details, next_in_receive_chain };
prepare_iterated_blocks_for_cementing (preparation_data);
// If used the top level, don't pop off the receive source pair because it wasn't used
if (!is_set && !receive_source_pairs.empty ())
{
// Check whether the previous block has been seen. If so, the rest of sends below have already been seen so don't count them
if (account_it != confirmed_iterated_pairs.cend ())
receive_source_pairs.pop_back ();
}
auto total_pending_write_block_count = std::accumulate (pending_writes.cbegin (), pending_writes.cend (), uint64_t (0), [](uint64_t total, auto const & write_details_a) {
return total += write_details_a.top_height - write_details_a.bottom_height + 1;
});
auto max_batch_write_size_reached = (total_pending_write_block_count >= batch_write_size);
// When there are a lot of pending confirmation height blocks, it is more efficient to
// bulk some of them up to enable better write performance which becomes the bottleneck.
auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time);
auto finished_iterating = current == original_hash;
auto non_awaiting_processing = awaiting_processing_size () == 0;
auto should_output = finished_iterating && (non_awaiting_processing || min_time_exceeded);
auto force_write = pending_writes.size () >= pending_writes_max_size || accounts_confirmed_info.size () >= pending_writes_max_size;
if (((max_batch_write_size_reached || should_output) && !pending_writes.empty ()) || force_write)
{
bool error = false;
// If nothing is currently using the database write lock then write the cemented pending blocks otherwise continue iterating
if (write_database_queue.process (nano::writer::confirmation_height))
{
account_it->second.confirmed_height = block_height;
if (block_height > iterated_height)
{
account_it->second.iterated_height = block_height;
}
auto scoped_write_guard = write_database_queue.pop ();
error = cement_blocks ();
}
else
else if (force_write)
{
confirmed_iterated_pairs.emplace (account, confirmed_iterated_pair{ block_height, block_height });
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
error = cement_blocks ();
}
pending_writes.emplace_back (account, current, block_height, block_height - confirmation_height, block_callbacks_required);
}
if (receive_details)
{
// Check whether the previous block has been seen. If so, the rest of sends below have already been seen so don't count them
auto const & receive_account = receive_details->account;
auto receive_account_it = confirmed_iterated_pairs.find (receive_account);
if (receive_account_it != confirmed_iterated_pairs.cend ())
{
// Get current height
auto current_height = receive_account_it->second.confirmed_height;
receive_account_it->second.confirmed_height = receive_details->height;
receive_details->num_blocks_confirmed = receive_details->height - current_height;
}
else
{
confirmed_iterated_pairs.emplace (receive_account, confirmed_iterated_pair{ receive_details->height, receive_details->height });
}
pending_writes.push_back (*receive_details);
}
if (!receive_source_pairs.empty ())
{
// Pop from the end
receive_source_pairs.erase (receive_source_pairs.end () - 1);
--receive_source_pairs_size;
}
}
else if (block_height > iterated_height)
{
if (account_it != confirmed_iterated_pairs.cend ())
{
account_it->second.iterated_height = block_height;
}
else
{
confirmed_iterated_pairs.emplace (account, confirmed_iterated_pair{ confirmation_height, block_height });
}
}
auto max_write_size_reached = (pending_writes.size () >= batch_write_size);
// When there are a lot of pending confirmation height blocks, it is more efficient to
// bulk some of them up to enable better write performance which becomes the bottleneck.
auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time);
auto finished_iterating = receive_source_pairs.empty ();
auto no_pending = pending_confirmations.size () == 0;
auto should_output = finished_iterating && (no_pending || min_time_exceeded);
if ((max_write_size_reached || should_output) && !pending_writes.empty ())
{
if (write_database_queue.process (nano::writer::confirmation_height))
{
auto scoped_write_guard = write_database_queue.pop ();
auto error = write_pending (pending_writes);
// Don't set any more blocks as confirmed from the original hash if an inconsistency is found
// Don't set any more cemented blocks from the original hash if an inconsistency is found
if (error)
{
checkpoints.clear ();
break;
}
}
}
read_transaction.renew ();
} while (!receive_source_pairs.empty () || current != hash_a);
transaction.refresh ();
} while (!receive_source_pairs.empty () || current != original_hash);
assert (checkpoints.empty ());
}
/*
* Returns true if there was an error in finding one of the blocks to write a confirmation height for, false otherwise
*/
bool nano::confirmation_height_processor::write_pending (std::deque<conf_height_details> & all_pending_a)
bool nano::confirmation_height_processor::iterate (nano::read_transaction const & transaction_a, uint64_t bottom_height_a, nano::block_hash const & bottom_hash_a, boost::circular_buffer_space_optimized<nano::block_hash> & checkpoints_a, nano::block_hash & top_most_non_receive_block_hash_a, nano::block_hash const & top_level_hash_a, boost::circular_buffer_space_optimized<receive_source_pair> & receive_source_pairs_a, nano::account const & account_a)
{
auto total_pending_write_block_count = std::accumulate (all_pending_a.cbegin (), all_pending_a.cend (), uint64_t (0), [](uint64_t total, conf_height_details const & conf_height_details_a) {
return total += conf_height_details_a.num_blocks_confirmed;
});
// Write in batches
while (total_pending_write_block_count > 0)
bool reached_target = false;
bool hit_receive = false;
auto hash = bottom_hash_a;
nano::block_sideband sideband;
uint64_t num_blocks = 0;
while (!hash.is_zero () && !reached_target && !stopped)
{
uint64_t num_accounts_processed = 0;
auto transaction (ledger.store.tx_begin_write ({}, { nano::tables::confirmation_height }));
while (!all_pending_a.empty ())
// Keep iterating upwards until we either reach the desired block or the second receive.
// Once a receive is cemented, we can cement all blocks above it until the next receive, so store those details for later.
++num_blocks;
auto block = ledger.store.block_get (transaction_a, hash, &sideband);
auto source (block->source ());
if (source.is_zero ())
{
const auto & pending = all_pending_a.front ();
nano::confirmation_height_info confirmation_height_info;
auto error = ledger.store.confirmation_height_get (transaction, pending.account, confirmation_height_info);
release_assert (!error);
auto confirmation_height = confirmation_height_info.height;
if (pending.height > confirmation_height)
source = block->link ();
}
if (!source.is_zero () && !ledger.is_epoch_link (source) && ledger.store.source_exists (transaction_a, source))
{
hit_receive = true;
reached_target = true;
auto next = !sideband.successor.is_zero () && sideband.successor != top_level_hash_a ? boost::optional<nano::block_hash> (sideband.successor) : boost::none;
receive_source_pairs_a.push_back (receive_source_pair{ receive_chain_details{ account_a, sideband.height, hash, top_level_hash_a, next, bottom_height_a, bottom_hash_a }, source });
// Store a checkpoint every max_items so that we can always traverse a long number of accounts to genesis
if (receive_source_pairs_a.size () % max_items == 0)
{
#ifndef NDEBUG
// Do more thorough checking in Debug mode, indicates programming error.
nano::block_sideband sideband;
auto block = ledger.store.block_get (transaction, pending.hash, &sideband);
static nano::network_constants network_constants;
assert (network_constants.is_test_network () || block != nullptr);
assert (network_constants.is_test_network () || sideband.height == pending.height);
#else
auto block = ledger.store.block_get (transaction, pending.hash);
#endif
// Check that the block still exists as there may have been changes outside this processor.
if (!block)
{
logger.always_log ("Failed to write confirmation height for: ", pending.hash.to_string ());
ledger.stats.inc (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block);
receive_source_pairs.clear ();
receive_source_pairs_size = 0;
all_pending_a.clear ();
return true;
}
for (auto & callback_data : pending.block_callbacks_required)
{
active.post_confirmation_height_set (transaction, callback_data.block, callback_data.sideband, callback_data.election_status_type);
}
ledger.stats.add (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in, pending.height - confirmation_height);
assert (pending.num_blocks_confirmed == pending.height - confirmation_height);
confirmation_height = pending.height;
ledger.cache.cemented_count += pending.num_blocks_confirmed;
ledger.store.confirmation_height_put (transaction, pending.account, { confirmation_height, pending.hash });
}
total_pending_write_block_count -= pending.num_blocks_confirmed;
++num_accounts_processed;
all_pending_a.erase (all_pending_a.begin ());
if (num_accounts_processed >= batch_write_size)
{
// Commit changes periodically to reduce time holding write locks for long chains
break;
checkpoints_a.push_back (top_level_hash_a);
}
}
}
assert (all_pending_a.empty ());
return false;
}
void nano::confirmation_height_processor::collect_unconfirmed_receive_and_sources_for_account (uint64_t block_height_a, uint64_t confirmation_height_a, nano::block_hash const & hash_a, nano::account const & account_a, nano::read_transaction const & transaction_a, std::vector<callback_data> & block_callbacks_required)
{
auto hash (hash_a);
auto num_to_confirm = block_height_a - confirmation_height_a;
// Store heights of blocks
constexpr auto height_not_set = std::numeric_limits<uint64_t>::max ();
auto next_height = height_not_set;
while ((num_to_confirm > 0) && !hash.is_zero () && !stopped)
{
nano::block_sideband sideband;
auto block (ledger.store.block_get (transaction_a, hash, &sideband));
if (block)
else
{
if (!pending_confirmations.is_processing_block (hash))
// Found a send/change/epoch block which isn't the desired top level
top_most_non_receive_block_hash_a = hash;
if (hash == top_level_hash_a)
{
auto election_status_type = active.confirm_block (transaction_a, block);
if (election_status_type.is_initialized ())
{
block_callbacks_required.emplace_back (block, sideband, *election_status_type);
}
reached_target = true;
}
else
{
// This block is the original which is having its confirmation height set on
block_callbacks_required.emplace_back (block, sideband, nano::election_status_type::active_confirmed_quorum);
hash = sideband.successor;
}
auto source (block->source ());
if (source.is_zero ())
{
source = block->link ();
}
if (!source.is_zero () && !ledger.is_epoch_link (source) && ledger.store.source_exists (transaction_a, source))
{
auto block_height = confirmation_height_a + num_to_confirm;
// Set the height for the receive block above (if there is one)
if (next_height != height_not_set)
{
receive_source_pairs.back ().receive_details.num_blocks_confirmed = next_height - block_height;
auto & receive_callbacks_required = receive_source_pairs.back ().receive_details.block_callbacks_required;
// Don't include the last one as that belongs to the next recieve
std::copy (block_callbacks_required.begin (), block_callbacks_required.end () - 1, std::back_inserter (receive_callbacks_required));
block_callbacks_required = { block_callbacks_required.back () };
}
receive_source_pairs.emplace_back (conf_height_details{ account_a, hash, block_height, height_not_set, {} }, source);
++receive_source_pairs_size;
next_height = block_height;
}
hash = block->previous ();
}
// We could be traversing a very large account so we don't want to open read transactions for too long.
if (num_to_confirm % batch_read_size == 0)
if ((num_blocks > 0) && num_blocks % batch_read_size == 0)
{
transaction_a.refresh ();
}
--num_to_confirm;
}
// Update the number of blocks confirmed by the last receive block
if (!receive_source_pairs.empty ())
return hit_receive;
}
// Once the path to genesis has been iterated to, we can begin to cement the lowest blocks in the accounts. This sets up
// the non-receive blocks which have been iterated for an account, and the associated receive block.
void nano::confirmation_height_processor::prepare_iterated_blocks_for_cementing (preparation_data & preparation_data_a)
{
if (!preparation_data_a.already_cemented)
{
auto & last_receive_details = receive_source_pairs.back ().receive_details;
last_receive_details.num_blocks_confirmed = last_receive_details.height - confirmation_height_a;
last_receive_details.block_callbacks_required = block_callbacks_required;
// Add the non-receive blocks iterated for this account
auto block_height = (ledger.store.block_account_height (preparation_data_a.transaction, preparation_data_a.top_most_non_receive_block_hash));
if (block_height > preparation_data_a.confirmation_height_info.height)
{
confirmed_info confirmed_info_l{ block_height, preparation_data_a.top_most_non_receive_block_hash };
if (preparation_data_a.account_it != accounts_confirmed_info.cend ())
{
preparation_data_a.account_it->second = confirmed_info_l;
}
else
{
accounts_confirmed_info.emplace (preparation_data_a.account, confirmed_info_l);
accounts_confirmed_info_size = accounts_confirmed_info.size ();
}
preparation_data_a.checkpoints.erase (std::remove (preparation_data_a.checkpoints.begin (), preparation_data_a.checkpoints.end (), preparation_data_a.top_most_non_receive_block_hash), preparation_data_a.checkpoints.end ());
pending_writes.emplace_back (preparation_data_a.account, preparation_data_a.bottom_height, preparation_data_a.bottom_most, block_height, preparation_data_a.top_most_non_receive_block_hash);
++pending_writes_size;
}
}
// Add the receive block and all non-receive blocks above that one
auto & receive_details = preparation_data_a.receive_details;
if (receive_details)
{
auto receive_confirmed_info_it = accounts_confirmed_info.find (receive_details->account);
if (receive_confirmed_info_it != accounts_confirmed_info.cend ())
{
auto & receive_confirmed_info = receive_confirmed_info_it->second;
receive_confirmed_info.confirmed_height = receive_details->height;
receive_confirmed_info.iterated_frontier = receive_details->hash;
}
else
{
accounts_confirmed_info.emplace (receive_details->account, confirmed_info{ receive_details->height, receive_details->hash });
accounts_confirmed_info_size = accounts_confirmed_info.size ();
}
if (receive_details->next.is_initialized ())
{
preparation_data_a.next_in_receive_chain = top_and_next_hash{ receive_details->top_level, receive_details->next, receive_details->height + 1 };
}
else
{
preparation_data_a.checkpoints.erase (std::remove (preparation_data_a.checkpoints.begin (), preparation_data_a.checkpoints.end (), receive_details->hash), preparation_data_a.checkpoints.end ());
}
pending_writes.emplace_back (receive_details->account, receive_details->bottom_height, receive_details->bottom_most, receive_details->height, receive_details->hash);
++pending_writes_size;
}
}
nano::confirmation_height_processor::conf_height_details::conf_height_details (nano::account const & account_a, nano::block_hash const & hash_a, uint64_t height_a, uint64_t num_blocks_confirmed_a, std::vector<callback_data> const & block_callbacks_required_a) :
bool nano::confirmation_height_processor::cement_blocks ()
{
// Will contain all blocks that have been cemented (bounded by batch_write_size)
// and will get run through the cemented observer callback
std::vector<callback_data> cemented_blocks;
{
// This only writes to the confirmation_height table and is the only place to do so in a single process
auto transaction (ledger.store.tx_begin_write ({}, { nano::tables::confirmation_height }));
// Cement all pending entries, each entry is specific to an account and contains the least amount
// of blocks to retain consistent cementing across all account chains to genesis.
while (!pending_writes.empty ())
{
const auto & pending = pending_writes.front ();
const auto & account = pending.account;
auto write_confirmation_height = [&account, &ledger = ledger, &transaction](uint64_t num_blocks_cemented, uint64_t confirmation_height, nano::block_hash const & confirmed_frontier) {
#ifndef NDEBUG
// Extra debug checks
nano::confirmation_height_info confirmation_height_info;
assert (!ledger.store.confirmation_height_get (transaction, account, confirmation_height_info));
nano::block_sideband sideband;
assert (ledger.store.block_get (transaction, confirmed_frontier, &sideband));
assert (sideband.height == confirmation_height_info.height + num_blocks_cemented);
#endif
ledger.store.confirmation_height_put (transaction, account, nano::confirmation_height_info{ confirmation_height, confirmed_frontier });
ledger.cache.cemented_count += num_blocks_cemented;
ledger.stats.add (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in, num_blocks_cemented);
};
nano::confirmation_height_info confirmation_height_info;
release_assert (!ledger.store.confirmation_height_get (transaction, pending.account, confirmation_height_info));
// Some blocks need to be cemented at least
if (pending.top_height > confirmation_height_info.height)
{
nano::block_sideband sideband;
// The highest hash which will be cemented
nano::block_hash new_cemented_frontier;
uint64_t num_blocks_confirmed = 0;
uint64_t start_height = 0;
if (pending.bottom_height > confirmation_height_info.height)
{
new_cemented_frontier = pending.bottom_hash;
// If we are higher than the cemented frontier, we should be exactly 1 block above
assert (pending.bottom_height == confirmation_height_info.height + 1);
num_blocks_confirmed = pending.top_height - pending.bottom_height + 1;
start_height = pending.bottom_height;
}
else
{
auto block = ledger.store.block_get (transaction, confirmation_height_info.frontier, &sideband);
new_cemented_frontier = sideband.successor;
num_blocks_confirmed = pending.top_height - confirmation_height_info.height;
start_height = confirmation_height_info.height + 1;
}
auto total_blocks_cemented = 0;
auto num_blocks_iterated = 0;
auto block = ledger.store.block_get (transaction, new_cemented_frontier, &sideband);
// Cementing starts from the bottom of the chain and works upwards. This is because chains can have effectively
// an infinite number of send/change blocks in a row. We don't want to hold the write transaction open for too long.
for (; num_blocks_confirmed - num_blocks_iterated != 0; ++num_blocks_iterated)
{
if (!block)
{
logger.always_log ("Failed to write confirmation height for: ", new_cemented_frontier.to_string ());
ledger.stats.inc (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block);
pending_writes.clear ();
pending_writes_size = 0;
return true;
}
cemented_blocks.emplace_back (block, sideband);
// We have likely hit a long chain, flush these callbacks and continue
if (cemented_blocks.size () == batch_write_size)
{
auto num_blocks_cemented = num_blocks_iterated - total_blocks_cemented + 1;
total_blocks_cemented += num_blocks_cemented;
write_confirmation_height (num_blocks_cemented, start_height + total_blocks_cemented - 1, new_cemented_frontier);
transaction.commit ();
notify_observers (cemented_blocks);
cemented_blocks.clear ();
transaction.renew ();
}
// Get the next block in the chain until we have reached the final desired one
auto last_iteration = (num_blocks_confirmed - num_blocks_iterated) == 1;
if (!last_iteration)
{
new_cemented_frontier = sideband.successor;
block = ledger.store.block_get (transaction, new_cemented_frontier, &sideband);
}
else
{
// Confirm it is indeed the last one
assert (new_cemented_frontier == pending.top_hash);
}
}
auto num_blocks_cemented = num_blocks_confirmed - total_blocks_cemented;
write_confirmation_height (num_blocks_cemented, pending.top_height, new_cemented_frontier);
}
auto it = accounts_confirmed_info.find (pending.account);
if (it != accounts_confirmed_info.cend () && it->second.confirmed_height == pending.top_height)
{
accounts_confirmed_info.erase (pending.account);
accounts_confirmed_info_size = accounts_confirmed_info.size ();
}
pending_writes.pop_front ();
--pending_writes_size;
}
}
notify_observers (cemented_blocks);
assert (pending_writes.empty ());
assert (pending_writes_size == 0);
nano::lock_guard<std::mutex> guard (mutex);
original_hashes_pending.clear ();
return false;
}
void nano::confirmation_height_processor::add_cemented_observer (std::function<void(callback_data)> const & callback_a)
{
nano::lock_guard<std::mutex> guard (mutex);
cemented_observers.push_back (callback_a);
}
void nano::confirmation_height_processor::add_cemented_batch_finished_observer (std::function<void()> const & callback_a)
{
nano::lock_guard<std::mutex> guard (mutex);
cemented_batch_finished_observers.push_back (callback_a);
}
void nano::confirmation_height_processor::notify_observers (std::vector<callback_data> const & cemented_blocks)
{
for (auto const & block_callback_data : cemented_blocks)
{
for (auto const & observer : cemented_observers)
{
observer (block_callback_data);
}
}
if (!cemented_blocks.empty ())
{
for (auto const & observer : cemented_batch_finished_observers)
{
observer ();
}
}
}
nano::confirmation_height_processor::receive_chain_details::receive_chain_details (nano::account const & account_a, uint64_t height_a, nano::block_hash const & hash_a, nano::block_hash const & top_level_a, boost::optional<nano::block_hash> next_a, uint64_t bottom_height_a, nano::block_hash const & bottom_most_a) :
account (account_a),
hash (hash_a),
height (height_a),
num_blocks_confirmed (num_blocks_confirmed_a),
block_callbacks_required (block_callbacks_required_a)
hash (hash_a),
top_level (top_level_a),
next (next_a),
bottom_height (bottom_height_a),
bottom_most (bottom_most_a)
{
}
nano::confirmation_height_processor::receive_source_pair::receive_source_pair (confirmation_height_processor::conf_height_details const & receive_details_a, const block_hash & source_a) :
receive_details (receive_details_a),
source_hash (source_a)
{
}
nano::confirmation_height_processor::confirmed_iterated_pair::confirmed_iterated_pair (uint64_t confirmed_height_a, uint64_t iterated_height_a) :
confirmed_height (confirmed_height_a), iterated_height (iterated_height_a)
{
}
nano::confirmation_height_processor::callback_data::callback_data (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a) :
block (block_a),
sideband (sideband_a),
election_status_type (election_status_type_a)
nano::confirmation_height_processor::write_details::write_details (nano::account const & account_a, uint64_t bottom_height_a, nano::block_hash const & bottom_hash_a, uint64_t top_height_a, nano::block_hash const & top_hash_a) :
bottom_height (bottom_height_a),
bottom_hash (bottom_hash_a),
top_height (top_height_a),
top_hash (top_hash_a),
account (account_a)
{
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (confirmation_height_processor & confirmation_height_processor_a, const std::string & name_a)
{
size_t receive_source_pairs_count = confirmation_height_processor_a.receive_source_pairs_size;
auto composite = std::make_unique<container_info_composite> (name_a);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "receive_source_pairs", receive_source_pairs_count, sizeof (decltype (confirmation_height_processor_a.receive_source_pairs)::value_type) }));
return composite;
}
size_t nano::pending_confirmation_height::size ()
{
nano::lock_guard<std::mutex> lk (mutex);
return pending.size ();
}
bool nano::pending_confirmation_height::is_processing_block (nano::block_hash const & hash_a)
{
// First check the hash currently being processed
nano::lock_guard<std::mutex> lk (mutex);
if (!current_hash.is_zero () && current_hash == hash_a)
size_t cemented_observers_count;
size_t cemented_batch_finished_observer_count;
{
return true;
nano::lock_guard<std::mutex> guard (confirmation_height_processor_a.mutex);
cemented_observers_count = confirmation_height_processor_a.cemented_observers.size ();
cemented_batch_finished_observer_count = confirmation_height_processor_a.cemented_batch_finished_observers.size ();
}
// Check remaining pending confirmations
return pending.find (hash_a) != pending.cend ();
}
nano::block_hash nano::pending_confirmation_height::current ()
{
nano::lock_guard<std::mutex> lk (mutex);
return current_hash;
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (pending_confirmation_height & pending_confirmation_height_a, const std::string & name_a)
{
size_t pending_count = pending_confirmation_height_a.size ();
auto composite = std::make_unique<container_info_composite> (name_a);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pending", pending_count, sizeof (nano::block_hash) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "cemented_observers", cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.cemented_observers)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "cemented_batch_finished_observers", cemented_batch_finished_observer_count, sizeof (decltype (confirmation_height_processor_a.cemented_batch_finished_observers)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pending_writes", confirmation_height_processor_a.pending_writes_size, sizeof (decltype (confirmation_height_processor_a.pending_writes)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "accounts_confirmed_info", confirmation_height_processor_a.accounts_confirmed_info_size, sizeof (decltype (confirmation_height_processor_a.accounts_confirmed_info)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "awaiting_processing", confirmation_height_processor_a.awaiting_processing_size (), sizeof (decltype (confirmation_height_processor_a.awaiting_processing)::value_type) }));
return composite;
}
size_t nano::confirmation_height_processor::awaiting_processing_size ()
{
nano::lock_guard<std::mutex> guard (mutex);
return awaiting_processing.size ();
}
bool nano::confirmation_height_processor::is_processing_block (nano::block_hash const & hash_a)
{
nano::lock_guard<std::mutex> guard (mutex);
return original_hashes_pending.find (hash_a) != original_hashes_pending.cend () || awaiting_processing.find (hash_a) != awaiting_processing.cend ();
}
nano::block_hash nano::confirmation_height_processor::current ()
{
nano::lock_guard<std::mutex> lk (mutex);
return original_hash;
}
nano::confirmation_height_processor::receive_source_pair::receive_source_pair (confirmation_height_processor::receive_chain_details const & receive_details_a, const block_hash & source_a) :
receive_details (receive_details_a),
source_hash (source_a)
{
}
nano::confirmation_height_processor::confirmed_info::confirmed_info (uint64_t confirmed_height_a, nano::block_hash const & iterated_frontier_a) :
confirmed_height (confirmed_height_a),
iterated_frontier (iterated_frontier_a)
{
}
nano::confirmation_height_processor::callback_data::callback_data (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a) :
block (block_a),
sideband (sideband_a)
{
}

View file

@ -1,7 +1,6 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/secure/blockstore.hpp>
#include <nano/secure/common.hpp>
@ -13,112 +12,158 @@
namespace nano
{
class ledger;
class active_transactions;
class read_transaction;
class logger_mt;
class write_database_queue;
class pending_confirmation_height
{
public:
size_t size ();
bool is_processing_block (nano::block_hash const &);
nano::block_hash current ();
private:
std::mutex mutex;
std::unordered_set<nano::block_hash> pending;
/** This is the last block popped off the confirmation height pending collection */
nano::block_hash current_hash{ 0 };
friend class confirmation_height_processor;
friend class confirmation_height_pending_observer_callbacks_Test;
friend class confirmation_height_dependent_election_Test;
friend class confirmation_height_dependent_election_after_already_cemented_Test;
};
std::unique_ptr<container_info_component> collect_container_info (pending_confirmation_height &, const std::string &);
class confirmation_height_processor final
{
public:
confirmation_height_processor (pending_confirmation_height &, nano::ledger &, nano::active_transactions &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &);
confirmation_height_processor (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &);
~confirmation_height_processor ();
void add (nano::block_hash const &);
void stop ();
void pause ();
void unpause ();
void stop ();
void add (nano::block_hash const & hash_a);
void run ();
size_t awaiting_processing_size ();
bool is_processing_block (nano::block_hash const &);
nano::block_hash current ();
/** The maximum amount of accounts to iterate over while writing */
static uint64_t constexpr batch_write_size = 2048;
class callback_data final
{
public:
callback_data (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a);
std::shared_ptr<nano::block> block;
nano::block_sideband sideband;
};
void add_cemented_observer (std::function<void(callback_data)> const &);
void add_cemented_batch_finished_observer (std::function<void()> const &);
/** The maximum amount of blocks to iterate over while writing */
static uint64_t constexpr batch_write_size = 4096;
/** The maximum number of blocks to be read in while iterating over a long account chain */
static uint64_t constexpr batch_read_size = 4096;
private:
class callback_data final
class top_and_next_hash final
{
public:
callback_data (std::shared_ptr<nano::block> const &, nano::block_sideband const &, nano::election_status_type);
std::shared_ptr<nano::block> block;
nano::block_sideband sideband;
nano::election_status_type election_status_type;
nano::block_hash top;
boost::optional<nano::block_hash> next;
uint64_t next_height;
};
class conf_height_details final
class confirmed_info
{
public:
conf_height_details (nano::account const &, nano::block_hash const &, uint64_t, uint64_t, std::vector<callback_data> const &);
confirmed_info (uint64_t confirmed_height_a, nano::block_hash const & iterated_frontier);
uint64_t confirmed_height;
nano::block_hash iterated_frontier;
};
/* Holds confirmation height/cemented frontier in memory for accounts while iterating */
std::unordered_map<account, confirmed_info> accounts_confirmed_info;
std::atomic<uint64_t> accounts_confirmed_info_size{ 0 };
class receive_chain_details final
{
public:
receive_chain_details (nano::account const &, uint64_t, nano::block_hash const &, nano::block_hash const &, boost::optional<nano::block_hash>, uint64_t, nano::block_hash const &);
nano::account account;
nano::block_hash hash;
uint64_t height;
uint64_t num_blocks_confirmed;
std::vector<callback_data> block_callbacks_required;
nano::block_hash hash;
nano::block_hash top_level;
boost::optional<nano::block_hash> next;
uint64_t bottom_height;
nano::block_hash bottom_most;
};
class preparation_data final
{
public:
nano::transaction const & transaction;
nano::block_hash const & top_most_non_receive_block_hash;
bool already_cemented;
boost::circular_buffer_space_optimized<nano::block_hash> & checkpoints;
decltype (accounts_confirmed_info.begin ()) account_it;
nano::confirmation_height_info const & confirmation_height_info;
nano::account const & account;
uint64_t bottom_height;
nano::block_hash const & bottom_most;
boost::optional<receive_chain_details> & receive_details;
boost::optional<top_and_next_hash> & next_in_receive_chain;
};
class write_details final
{
public:
write_details (nano::account const &, uint64_t, nano::block_hash const &, uint64_t, nano::block_hash const &);
nano::account account;
// This is the first block hash (bottom most) which is not cemented
uint64_t bottom_height;
nano::block_hash bottom_hash;
// Desired cemented frontier
uint64_t top_height;
nano::block_hash top_hash;
};
class receive_source_pair final
{
public:
receive_source_pair (conf_height_details const &, const nano::block_hash &);
receive_source_pair (receive_chain_details const &, const nano::block_hash &);
conf_height_details receive_details;
receive_chain_details receive_details;
nano::block_hash source_hash;
};
class confirmed_iterated_pair
{
public:
confirmed_iterated_pair (uint64_t confirmed_height_a, uint64_t iterated_height_a);
uint64_t confirmed_height;
uint64_t iterated_height;
};
std::mutex mutex;
// Hashes which have been added to the confirmation height processor, but not yet processed
std::unordered_set<nano::block_hash> awaiting_processing;
// Hashes which have been added and processed, but have not been cemented
std::unordered_set<nano::block_hash> original_hashes_pending;
std::vector<std::function<void(callback_data)>> cemented_observers;
std::vector<std::function<void()>> cemented_batch_finished_observers;
/** This is the last block popped off the confirmation height pending collection */
nano::block_hash original_hash{ 0 };
nano::ledger & ledger;
nano::logger_mt & logger;
nano::condition_variable condition;
nano::pending_confirmation_height & pending_confirmations;
std::atomic<bool> stopped{ false };
std::atomic<bool> paused{ false };
nano::ledger & ledger;
nano::active_transactions & active;
nano::logger_mt & logger;
std::atomic<uint64_t> receive_source_pairs_size{ 0 };
std::vector<receive_source_pair> receive_source_pairs;
std::deque<conf_height_details> pending_writes;
// Store the highest confirmation heights for accounts in pending_writes to reduce unnecessary iterating,
// and iterated height to prevent iterating over the same blocks more than once from self-sends or "circular" sends between the same accounts.
std::unordered_map<account, confirmed_iterated_pair> confirmed_iterated_pairs;
nano::timer<std::chrono::milliseconds> timer;
nano::write_database_queue & write_database_queue;
std::chrono::milliseconds batch_separate_pending_min_time;
std::thread thread;
nano::timer<std::chrono::milliseconds> timer;
nano::network_constants network_constants;
void run ();
void add_confirmation_height (nano::block_hash const &);
void collect_unconfirmed_receive_and_sources_for_account (uint64_t, uint64_t, nano::block_hash const &, nano::account const &, nano::read_transaction const &, std::vector<callback_data> &);
bool write_pending (std::deque<conf_height_details> &);
bool cement_blocks ();
static uint32_t constexpr max_items{ 65536 };
std::deque<write_details> pending_writes;
std::atomic<uint64_t> pending_writes_size{ 0 };
static uint32_t constexpr pending_writes_max_size{ max_items };
std::thread thread;
friend std::unique_ptr<container_info_component> collect_container_info (confirmation_height_processor &, const std::string &);
friend class confirmation_height_pending_observer_callbacks_Test;
friend class confirmation_height_dependent_election_Test;
friend class confirmation_height_dependent_election_after_already_cemented_Test;
private:
top_and_next_hash get_next_block (boost::optional<top_and_next_hash> const &, boost::circular_buffer_space_optimized<nano::block_hash> const &, boost::circular_buffer_space_optimized<receive_source_pair> const & receive_source_pairs, boost::optional<receive_chain_details> &);
nano::block_hash get_least_unconfirmed_hash_from_top_level (nano::transaction const &, nano::block_hash const &, nano::account const &, nano::confirmation_height_info const &, uint64_t &);
void notify_observers (std::vector<callback_data> const & cemented_blocks);
void prepare_iterated_blocks_for_cementing (preparation_data &);
void set_next_hash ();
void process ();
bool iterate (nano::read_transaction const &, uint64_t, nano::block_hash const &, boost::circular_buffer_space_optimized<nano::block_hash> &, nano::block_hash &, nano::block_hash const &, boost::circular_buffer_space_optimized<receive_source_pair> &, nano::account const &);
};
std::unique_ptr<container_info_component> collect_container_info (confirmation_height_processor &, const std::string &);

View file

@ -37,15 +37,14 @@ void nano::election::confirm_once (nano::election_status_type type_a)
auto status_l (status);
auto node_l (node.shared ());
auto confirmation_action_l (confirmation_action);
node.active.election_winner_details.emplace (status.winner->hash (), shared_from_this ());
node.background ([node_l, status_l, confirmation_action_l]() {
node_l->process_confirmed (status_l);
confirmation_action_l (status_l.winner);
});
auto root (status.winner->qualified_root ());
node.active.pending_conf_height.emplace (status.winner->hash (), shared_from_this ());
clear_blocks ();
clear_dependent ();
node.active.roots.erase (root);
node.active.roots.erase (status.winner->qualified_root ());
}
}

View file

@ -1761,7 +1761,7 @@ void nano::json_handler::confirmation_active ()
void nano::json_handler::confirmation_height_currently_processing ()
{
auto hash = node.pending_confirmation_height.current ();
auto hash = node.confirmation_height_processor.current ();
if (!hash.is_zero ())
{
response_l.put ("hash", hash.to_string ());

View file

@ -144,9 +144,9 @@ block_processor_thread ([this]() {
online_reps (ledger, network_params, config.online_weight_minimum.number ()),
votes_cache (wallets),
vote_uniquer (block_uniquer),
active (*this),
confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger),
active (*this, confirmation_height_processor),
aggregator (network_params.network, config, stats, votes_cache, store, wallets),
confirmation_height_processor (pending_confirmation_height, ledger, active, write_database_queue, config.conf_height_processor_batch_min_time, logger),
payment_observer_processor (observers.blocks),
wallets (wallets_store.init_error (), *this),
startup_time (std::chrono::steady_clock::now ())
@ -597,7 +597,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.block_uniquer, "block_uniquer"));
composite->add_component (collect_container_info (node.vote_uniquer, "vote_uniquer"));
composite->add_component (collect_container_info (node.confirmation_height_processor, "confirmation_height_processor"));
composite->add_component (collect_container_info (node.pending_confirmation_height, "pending_confirmation_height"));
composite->add_component (collect_container_info (node.worker, "worker"));
composite->add_component (collect_container_info (node.distributed_work, "distributed_work"));
composite->add_component (collect_container_info (node.aggregator, "request_aggregator"));
@ -696,8 +695,8 @@ void nano::node::stop ()
}
aggregator.stop ();
vote_processor.stop ();
confirmation_height_processor.stop ();
active.stop ();
confirmation_height_processor.stop ();
network.stop ();
telemetry.stop ();
if (websocket_server)
@ -1088,7 +1087,7 @@ void nano::node::block_confirm (std::shared_ptr<nano::block> block_a)
bool nano::node::block_confirmed_or_being_confirmed (nano::transaction const & transaction_a, nano::block_hash const & hash_a)
{
return ledger.block_confirmed (transaction_a, hash_a) || pending_confirmation_height.is_processing_block (hash_a);
return ledger.block_confirmed (transaction_a, hash_a) || confirmation_height_processor.is_processing_block (hash_a);
}
nano::uint128_t nano::node::delta () const

View file

@ -183,10 +183,9 @@ public:
nano::keypair node_id;
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer;
nano::pending_confirmation_height pending_confirmation_height; // Used by both active and confirmation height processor
nano::confirmation_height_processor confirmation_height_processor;
nano::active_transactions active;
nano::request_aggregator aggregator;
nano::confirmation_height_processor confirmation_height_processor;
nano::payment_observer_processor payment_observer_processor;
nano::wallets wallets;
const std::chrono::steady_clock::time_point startup_time;

View file

@ -6154,7 +6154,7 @@ TEST (rpc, confirmation_height_currently_processing)
rpc.start ();
system.deadline_set (10s);
while (!node->pending_confirmation_height.is_processing_block (previous_genesis_chain_hash))
while (!node->confirmation_height_processor.is_processing_block (previous_genesis_chain_hash))
{
ASSERT_NO_ERROR (system.poll ());
}
@ -6172,16 +6172,10 @@ TEST (rpc, confirmation_height_currently_processing)
ASSERT_EQ (frontier->hash ().to_string (), hash);
}
// Wait until confirmation has been set
// Wait until confirmation has been set and not processing anything
system.deadline_set (10s);
while (true)
while (!node->confirmation_height_processor.current ().is_zero ())
{
auto transaction = node->store.tx_begin_read ();
if (node->ledger.block_confirmed (transaction, frontier->hash ()))
{
break;
}
ASSERT_NO_ERROR (system.poll ());
}
@ -7045,7 +7039,7 @@ TEST (rpc, block_confirmed)
}
// Should no longer be processing the block after confirmation is set
ASSERT_FALSE (node->pending_confirmation_height.is_processing_block (send->hash ()));
ASSERT_FALSE (node->confirmation_height_processor.is_processing_block (send->hash ()));
// Requesting confirmation for this should now succeed
request.put ("hash", send->hash ().to_string ());

View file

@ -496,18 +496,13 @@ TEST (confirmation_height, many_accounts_single_confirmation)
}
system.deadline_set (60s);
while (true)
auto transaction = node->store.tx_begin_read ();
while (!node->ledger.block_confirmed (transaction, last_open_hash))
{
auto transaction = node->store.tx_begin_read ();
if (node->ledger.block_confirmed (transaction, last_open_hash))
{
break;
}
ASSERT_NO_ERROR (system.poll ());
transaction.refresh ();
}
auto transaction (node->store.tx_begin_read ());
// All frontiers (except last) should have 2 blocks and both should be confirmed
for (auto i (node->store.latest_begin (transaction)), n (node->store.latest_end ()); i != n; ++i)
{
@ -520,7 +515,20 @@ TEST (confirmation_height, many_accounts_single_confirmation)
ASSERT_EQ (count, account_info.block_count);
}
auto cemented_count = 0;
for (auto i (node->ledger.store.confirmation_height_begin (transaction)), n (node->ledger.store.confirmation_height_end ()); i != n; ++i)
{
cemented_count += i->second.height;
}
ASSERT_EQ (cemented_count, node->ledger.cache.cemented_count);
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), num_accounts * 2 - 2);
system.deadline_set (20s);
while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out))
{
ASSERT_NO_ERROR (system.poll ());
}
}
// Can take up to 10 minutes
@ -539,7 +547,7 @@ TEST (confirmation_height, many_accounts_many_confirmations)
node->active.next_frontier_check = std::chrono::steady_clock::now () + 7200s;
}
auto num_accounts = 10000;
auto num_accounts = nano::confirmation_height_processor::batch_write_size * 2 + 50;
auto latest_genesis = node->latest (nano::test_genesis_key.pub);
std::vector<std::shared_ptr<nano::open_block>> open_blocks;
{
@ -569,6 +577,21 @@ TEST (confirmation_height, many_accounts_many_confirmations)
{
ASSERT_NO_ERROR (system.poll ());
}
auto transaction = node->store.tx_begin_read ();
auto cemented_count = 0;
for (auto i (node->ledger.store.confirmation_height_begin (transaction)), n (node->ledger.store.confirmation_height_end ()); i != n; ++i)
{
cemented_count += i->second.height;
}
ASSERT_EQ (cemented_count, node->ledger.cache.cemented_count);
system.deadline_set (20s);
while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out))
{
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (confirmation_height, long_chains)
@ -588,7 +611,7 @@ TEST (confirmation_height, long_chains)
node->active.next_frontier_check = std::chrono::steady_clock::now () + 7200s;
}
constexpr auto num_blocks = 10000;
constexpr auto num_blocks = nano::confirmation_height_processor::batch_write_size * 2 + 50;
// First open the other account
nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio + num_blocks + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest));
@ -660,7 +683,20 @@ TEST (confirmation_height, long_chains)
ASSERT_EQ (num_blocks + 1, confirmation_height_info.height);
ASSERT_EQ (num_blocks + 1, account_info.block_count);
auto cemented_count = 0;
for (auto i (node->ledger.store.confirmation_height_begin (transaction)), n (node->ledger.store.confirmation_height_end ()); i != n; ++i)
{
cemented_count += i->second.height;
}
ASSERT_EQ (cemented_count, node->ledger.cache.cemented_count);
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), num_blocks * 2 + 2);
system.deadline_set (20s);
while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out))
{
ASSERT_NO_ERROR (system.poll ());
}
}
// Can take up to 1 hour
@ -678,7 +714,7 @@ TEST (confirmation_height, prioritize_frontiers_overwrite)
node->active.next_frontier_check = std::chrono::steady_clock::now () + 7200s;
}
auto num_accounts = node->active.max_priority_cementable_frontiers * 2;
auto num_accounts = node->active.max_priority_cementable_frontiers * 2 + 50;
nano::keypair last_keypair = nano::test_genesis_key;
auto last_open_hash = node->latest (nano::test_genesis_key.pub);
// Clear confirmation height so that the genesis account has the same amount of uncemented blocks as the other frontiers