diff --git a/nano/core_test/confirmation_height.cpp b/nano/core_test/confirmation_height.cpp index 5f93aa8e..71469464 100644 --- a/nano/core_test/confirmation_height.cpp +++ b/nano/core_test/confirmation_height.cpp @@ -72,6 +72,8 @@ TEST (confirmation_height, single) ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); ASSERT_EQ (1, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); + + ASSERT_EQ (0, node->active.election_winner_details_size ()); } }; @@ -213,6 +215,8 @@ TEST (confirmation_height, multiple_accounts) ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); ASSERT_EQ (10, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); + + ASSERT_EQ (0, node->active.election_winner_details_size ()); }; test_mode (nano::confirmation_height_mode::bounded); @@ -288,6 +292,8 @@ TEST (confirmation_height, gap_bootstrap) ASSERT_EQ (0, node1.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); ASSERT_EQ (0, node1.stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); ASSERT_EQ (0, node1.stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); + + ASSERT_EQ (0, node1.active.election_winner_details_size ()); }; test_mode (nano::confirmation_height_mode::bounded); @@ -374,6 +380,8 @@ TEST (confirmation_height, gap_live) ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); ASSERT_EQ (6, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); + + ASSERT_EQ (0, node->active.election_winner_details_size ()); } }; @@ -460,6 +468,8 @@ TEST (confirmation_height, send_receive_between_2_accounts) ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); ASSERT_EQ (10, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); ASSERT_EQ (11, node->ledger.cache.cemented_count); + + ASSERT_EQ (0, node->active.election_winner_details_size ()); }; test_mode (nano::confirmation_height_mode::bounded); @@ -524,6 +534,7 @@ TEST (confirmation_height, send_receive_self) ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); ASSERT_EQ (6, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); ASSERT_EQ (confirmation_height_info.height, node->ledger.cache.cemented_count); + ASSERT_EQ (0, node->active.election_winner_details_size ()); }; test_mode (nano::confirmation_height_mode::bounded); @@ -635,6 +646,8 @@ TEST (confirmation_height, all_block_types) ASSERT_EQ (15, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); ASSERT_EQ (15, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); ASSERT_EQ (16, node->ledger.cache.cemented_count); + + ASSERT_EQ (0, node->active.election_winner_details_size ()); }; test_mode (nano::confirmation_height_mode::bounded); @@ -743,6 +756,7 @@ TEST (confirmation_height, observers) ASSERT_EQ (1, node1->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); ASSERT_EQ (1, node1->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); ASSERT_EQ (1, node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); + ASSERT_EQ (0, node1->active.election_winner_details_size ()); }; test_mode (nano::confirmation_height_mode::bounded); @@ -792,6 +806,7 @@ TEST (confirmation_height, modified_chain) } ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block, nano::stat::dir::in)); + ASSERT_EQ (0, node->active.election_winner_details_size ()); }; test_mode (nano::confirmation_height_mode::bounded); @@ -836,6 +851,7 @@ TEST (confirmation_height, pending_observer_callbacks) ASSERT_EQ (2, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); ASSERT_EQ (2, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); + ASSERT_EQ (0, node->active.election_winner_details_size ()); }; test_mode (nano::confirmation_height_mode::bounded); @@ -1181,6 +1197,7 @@ TEST (confirmation_height, dependent_election) { // The write guard prevents the confirmation height processor doing any writes. + // Note: This test could still fail intermittently due to thread scheduling between active and confirmation height. system.deadline_set (10s); auto write_guard = node->write_database_queue.wait (nano::writer::testing); while (!node->write_database_queue.contains (nano::writer::confirmation_height)) @@ -1283,6 +1300,7 @@ TEST (confirmation_height, cemented_gap_below_receive) ASSERT_EQ (9, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); + ASSERT_EQ (0, node->active.election_winner_details_size ()); // Check that the order of callbacks is correct std::vector expected_order = { send.hash (), open.hash (), send1.hash (), receive1.hash (), send2.hash (), dummy_send.hash (), receive2.hash (), dummy_send1.hash (), send3.hash (), open1->hash () }; @@ -1368,6 +1386,7 @@ TEST (confirmation_height, cemented_gap_below_no_cache) auto transaction = node->store.tx_begin_read (); ASSERT_TRUE (node->ledger.block_confirmed (transaction, open1->hash ())); + ASSERT_EQ (node->active.election_winner_details_size (), 0); ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out)); ASSERT_EQ (0, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out)); ASSERT_EQ (5, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); @@ -1395,13 +1414,13 @@ TEST (confirmation_height, election_winner_details_clearing) nano::keypair key1; auto send = std::make_shared (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest)); auto send1 = std::make_shared (send->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send->hash ())); - nano::send_block send2 (send1->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ())); + auto send2 = std::make_shared (send1->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ())); { auto transaction = node->store.tx_begin_write (); ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send).code); ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send1).code); - ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send2).code); } add_callback_stats (*node); @@ -1442,8 +1461,16 @@ TEST (confirmation_height, election_winner_details_clearing) ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); - // election_winner_details should get cleared during another batch of cementing, so add another block - node->confirmation_height_processor.add (send2.hash ()); + node->block_confirm (send2); + system.deadline_set (10s); + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 3) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Add an already cemented block with fake election details. It should get removed + node->active.add_election_winner_details (send2->hash (), nullptr); + node->confirmation_height_processor.add (send2->hash ()); system.deadline_set (10s); while (node->active.election_winner_details_size () > 0) @@ -1452,8 +1479,8 @@ TEST (confirmation_height, election_winner_details_clearing) } ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); - ASSERT_EQ (2, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); - ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out)); + ASSERT_EQ (3, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); + ASSERT_EQ (2, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out)); ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in)); }; diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 7161cfcb..ce86bbf4 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -29,13 +29,13 @@ thread ([this]() { }) { // Register a callback which will get called after a block is cemented - confirmation_height_processor.add_cemented_observer ([this](nano::block_w_sideband const & callback_data) { - this->block_cemented_callback (callback_data.block, callback_data.sideband); + confirmation_height_processor.add_cemented_observer ([this](nano::block_w_sideband const & callback_data_a) { + this->block_cemented_callback (callback_data_a.block, callback_data_a.sideband); }); // Register a callback which will get called after a batch of blocks is written and observer calls finished - confirmation_height_processor.add_cemented_process_finished_observer ([this]() { - this->cemented_batch_finished_callback (); + confirmation_height_processor.add_block_already_cemented_observer ([this](nano::block_hash const & hash_a) { + this->block_already_cemented_callback (hash_a); }); debug_assert (min_time_between_requests > std::chrono::milliseconds (node.network_params.network.request_interval_ms)); @@ -153,15 +153,20 @@ void nano::active_transactions::block_cemented_callback (std::shared_ptrhash ()); - nano::lock_guard lock (mutex); + nano::unique_lock election_winners_lk (election_winner_details_mutex); auto existing (election_winner_details.find (hash)); if (existing != election_winner_details.end ()) { auto election = existing->second; + election_winner_details.erase (hash); + election_winners_lk.unlock (); + // Make sure mutex is held before election usage so we know that confirm_once has + // finished removing the root from active to avoid any data race. + nano::unique_lock lk (mutex); if (election->confirmed () && !election->stopped && election->status.winner->hash () == hash) { - add_confirmed (existing->second->status, block_a->qualified_root ()); - + add_confirmed (election->status, block_a->qualified_root ()); + lk.unlock (); node.receive_confirmed (transaction, block_a, hash); nano::account account (0); nano::uint128_t amount (0); @@ -180,32 +185,25 @@ void nano::active_transactions::block_cemented_callback (std::shared_ptr const & election_a) +{ + nano::lock_guard guard (election_winner_details_mutex); + election_winner_details.emplace (hash_a, election_a); +} + +void nano::active_transactions::block_already_cemented_callback (nano::block_hash const & hash_a) { // Depending on timing there is a situation where the election_winner_details is not reset. // This can happen when a block wins an election, and the block is confirmed + observer // called before the block hash gets added to election_winner_details. If the block is confirmed // callbacks have already been done, so we can safely just remove it. - auto transaction = node.store.tx_begin_read (); - nano::lock_guard guard (mutex); - for (auto it = election_winner_details.begin (); it != election_winner_details.end ();) - { - if (node.ledger.block_confirmed (transaction, it->first)) - { - it = election_winner_details.erase (it); - } - else - { - ++it; - } - } + nano::lock_guard guard (election_winner_details_mutex); + election_winner_details.erase (hash_a); } void nano::active_transactions::election_escalate (std::shared_ptr & election_l, nano::transaction const & transaction_l, size_t const & roots_size_l) @@ -1076,7 +1074,7 @@ bool nano::active_transactions::inactive_votes_bootstrap_check (std::vector guard (mutex); + nano::lock_guard guard (election_winner_details_mutex); return election_winner_details.size (); } diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index 82c31932..d1c68f45 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -110,7 +110,7 @@ public: bool publish (std::shared_ptr block_a); boost::optional confirm_block (nano::transaction const &, std::shared_ptr); void block_cemented_callback (std::shared_ptr const & block_a, nano::block_sideband const & sideband_a); - void cemented_batch_finished_callback (); + void block_already_cemented_callback (nano::block_hash const &); // clang-format off boost::multi_index_container difficulty_trend (); size_t inactive_votes_cache_size (); - std::unordered_map> election_winner_details; size_t election_winner_details_size (); + void add_election_winner_details (nano::block_hash const &, std::shared_ptr const &); nano::confirmation_solicitor solicitor; private: + std::mutex election_winner_details_mutex; + std::unordered_map> election_winner_details; + // Call action with confirmed block, may be different than what we started with // clang-format off std::pair, bool> insert_impl (std::shared_ptr, bool const = false, std::function)> const & = [](std::shared_ptr) {}); @@ -208,6 +211,7 @@ private: friend class confirmation_height_prioritize_frontiers_Test; friend class confirmation_height_prioritize_frontiers_overwrite_Test; + friend std::unique_ptr collect_container_info (active_transactions &, const std::string &); }; std::unique_ptr collect_container_info (active_transactions & active_transactions, const std::string & name); diff --git a/nano/node/confirmation_height_bounded.cpp b/nano/node/confirmation_height_bounded.cpp index 4b543fd3..e03f3040 100644 --- a/nano/node/confirmation_height_bounded.cpp +++ b/nano/node/confirmation_height_bounded.cpp @@ -8,7 +8,7 @@ #include -nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic & stopped_a, nano::block_hash const & original_hash_a, std::function const &)> const & notify_observers_callback_a, std::function const & awaiting_processing_size_callback_a) : +nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic & stopped_a, nano::block_hash const & original_hash_a, std::function const &)> const & notify_observers_callback_a, std::function const & notify_block_already_cemented_observers_callback_a, std::function const & awaiting_processing_size_callback_a) : ledger (ledger_a), write_database_queue (write_database_queue_a), batch_separate_pending_min_time (batch_separate_pending_min_time_a), @@ -16,6 +16,7 @@ logger (logger_a), stopped (stopped_a), original_hash (original_hash_a), notify_observers_callback (notify_observers_callback_a), +notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a), awaiting_processing_size_callback (awaiting_processing_size_callback_a) { } @@ -57,6 +58,7 @@ void nano::confirmation_height_bounded::process () boost::circular_buffer_space_optimized checkpoints{ max_items }; boost::circular_buffer_space_optimized receive_source_pairs{ max_items }; nano::block_hash current; + bool first_iter = true; auto transaction (ledger.store.tx_begin_read ()); do { @@ -85,7 +87,13 @@ void nano::confirmation_height_bounded::process () else { auto error = ledger.store.confirmation_height_get (transaction, account, confirmation_height_info); + (void)error; debug_assert (!error); + // This block was added to the confirmation height processor but is already confirmed + if (first_iter && confirmation_height_info.height >= sideband.height && current == original_hash) + { + notify_block_already_cemented_observers_callback (original_hash); + } } auto block_height = sideband.height; @@ -181,6 +189,7 @@ void nano::confirmation_height_bounded::process () } } + first_iter = false; transaction.refresh (); } while ((!receive_source_pairs.empty () || current != original_hash) && !stopped); diff --git a/nano/node/confirmation_height_bounded.hpp b/nano/node/confirmation_height_bounded.hpp index b72300bd..8a444245 100644 --- a/nano/node/confirmation_height_bounded.hpp +++ b/nano/node/confirmation_height_bounded.hpp @@ -15,7 +15,7 @@ class write_database_queue; class confirmation_height_bounded final { public: - confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic &, nano::block_hash const &, std::function const &)> const &, std::function const &); + confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic &, nano::block_hash const &, std::function const &)> const &, std::function const &, std::function const &); bool pending_empty () const; void prepare_new (); void process (); @@ -115,6 +115,7 @@ private: std::atomic & stopped; nano::block_hash const & original_hash; std::function const &)> notify_observers_callback; + std::function notify_block_already_cemented_observers_callback; std::function awaiting_processing_size_callback; friend std::unique_ptr collect_container_info (confirmation_height_bounded &, const std::string & name_a); diff --git a/nano/node/confirmation_height_processor.cpp b/nano/node/confirmation_height_processor.cpp index e65698ed..42d6b85e 100644 --- a/nano/node/confirmation_height_processor.cpp +++ b/nano/node/confirmation_height_processor.cpp @@ -15,8 +15,8 @@ nano::confirmation_height_processor::confirmation_height_processor (nano::ledger ledger (ledger_a), write_database_queue (write_database_queue_a), // clang-format off -confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this]() { return this->awaiting_processing_size (); }), -confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this]() { return this->awaiting_processing_size (); }), +confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }), +confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }), // clang-format on thread ([this, &latch, mode_a]() { nano::thread_role::set (nano::thread_role::name::confirmation_height_processing); @@ -88,34 +88,39 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a) else { auto lock_and_cleanup = [&lk, this]() { - for (auto const & observer : cemented_process_finished_observers) - { - observer (); - } lk.lock (); original_hash.clear (); original_hashes_pending.clear (); }; - lk.unlock (); - // If there are blocks pending cementing, then make sure we flush out the remaining writes - if (!confirmation_height_bounded_processor.pending_empty ()) + if (!paused) { - debug_assert (confirmation_height_unbounded_processor.pending_empty ()); - auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height); - confirmation_height_bounded_processor.cement_blocks (); - lock_and_cleanup (); - } - else if (!confirmation_height_unbounded_processor.pending_empty ()) - { - debug_assert (confirmation_height_bounded_processor.pending_empty ()); - auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height); - confirmation_height_unbounded_processor.cement_blocks (); - lock_and_cleanup (); + lk.unlock (); + + // If there are blocks pending cementing, then make sure we flush out the remaining writes + if (!confirmation_height_bounded_processor.pending_empty ()) + { + debug_assert (confirmation_height_unbounded_processor.pending_empty ()); + auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height); + confirmation_height_bounded_processor.cement_blocks (); + lock_and_cleanup (); + } + else if (!confirmation_height_unbounded_processor.pending_empty ()) + { + debug_assert (confirmation_height_bounded_processor.pending_empty ()); + auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height); + confirmation_height_unbounded_processor.cement_blocks (); + lock_and_cleanup (); + } + else + { + lock_and_cleanup (); + condition.wait (lk); + } } else { - lock_and_cleanup (); + original_hash.clear (); condition.wait (lk); } } @@ -125,12 +130,16 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a) // Pausing only affects processing new blocks, not the current one being processed. Currently only used in tests void nano::confirmation_height_processor::pause () { + nano::lock_guard lk (mutex); paused = true; } void nano::confirmation_height_processor::unpause () { - paused = false; + { + nano::lock_guard lk (mutex); + paused = false; + } condition.notify_one (); } @@ -159,9 +168,9 @@ void nano::confirmation_height_processor::add_cemented_observer (std::function const & callback_a) +void nano::confirmation_height_processor::add_block_already_cemented_observer (std::function const & callback_a) { - cemented_process_finished_observers.push_back (callback_a); + block_already_cemented_observers.push_back (callback_a); } void nano::confirmation_height_processor::notify_observers (std::vector const & cemented_blocks) @@ -175,14 +184,22 @@ void nano::confirmation_height_processor::notify_observers (std::vector nano::collect_container_info (confirmation_height_processor & confirmation_height_processor_a, const std::string & name_a) { auto composite = std::make_unique (name_a); size_t cemented_observers_count = confirmation_height_processor_a.cemented_observers.size (); - size_t cemented_process_finished_observer_count = confirmation_height_processor_a.cemented_process_finished_observers.size (); + size_t block_already_cemented_observers_count = confirmation_height_processor_a.block_already_cemented_observers.size (); composite->add_component (std::make_unique (container_info{ "cemented_observers", cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.cemented_observers)::value_type) })); - composite->add_component (std::make_unique (container_info{ "cemented_process_finished_observers", cemented_process_finished_observer_count, sizeof (decltype (confirmation_height_processor_a.cemented_process_finished_observers)::value_type) })); + composite->add_component (std::make_unique (container_info{ "block_already_cemented_observers", block_already_cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.block_already_cemented_observers)::value_type) })); composite->add_component (std::make_unique (container_info{ "awaiting_processing", confirmation_height_processor_a.awaiting_processing_size (), sizeof (decltype (confirmation_height_processor_a.awaiting_processing)::value_type) })); composite->add_component (collect_container_info (confirmation_height_processor_a.confirmation_height_bounded_processor, "bounded_processor")); composite->add_component (collect_container_info (confirmation_height_processor_a.confirmation_height_unbounded_processor, "unbounded_processor")); diff --git a/nano/node/confirmation_height_processor.hpp b/nano/node/confirmation_height_processor.hpp index 5efe35a1..facc3e53 100644 --- a/nano/node/confirmation_height_processor.hpp +++ b/nano/node/confirmation_height_processor.hpp @@ -36,7 +36,7 @@ public: nano::block_hash current (); void add_cemented_observer (std::function const &); - void add_cemented_process_finished_observer (std::function const &); + void add_block_already_cemented_observer (std::function const &); private: std::mutex mutex; @@ -44,15 +44,15 @@ private: std::unordered_set awaiting_processing; // Hashes which have been added and processed, but have not been cemented std::unordered_set original_hashes_pending; + bool paused{ false }; /** This is the last block popped off the confirmation height pending collection */ nano::block_hash original_hash{ 0 }; nano::condition_variable condition; std::atomic stopped{ false }; - std::atomic paused{ false }; std::vector> cemented_observers; - std::vector> cemented_process_finished_observers; + std::vector> block_already_cemented_observers; nano::ledger & ledger; nano::write_database_queue & write_database_queue; @@ -61,7 +61,8 @@ private: std::thread thread; void set_next_hash (); - void notify_observers (std::vector const & cemented_blocks); + void notify_observers (std::vector const &); + void notify_observers (nano::block_hash const &); friend std::unique_ptr collect_container_info (confirmation_height_processor &, const std::string &); friend class confirmation_height_pending_observer_callbacks_Test; diff --git a/nano/node/confirmation_height_unbounded.cpp b/nano/node/confirmation_height_unbounded.cpp index a5327da9..bdca0bb2 100644 --- a/nano/node/confirmation_height_unbounded.cpp +++ b/nano/node/confirmation_height_unbounded.cpp @@ -5,7 +5,7 @@ #include -nano::confirmation_height_unbounded::confirmation_height_unbounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic & stopped_a, nano::block_hash const & original_hash_a, std::function const &)> const & notify_observers_callback_a, std::function const & awaiting_processing_size_callback_a) : +nano::confirmation_height_unbounded::confirmation_height_unbounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic & stopped_a, nano::block_hash const & original_hash_a, std::function const &)> const & notify_observers_callback_a, std::function const & notify_block_already_cemented_observers_callback_a, std::function const & awaiting_processing_size_callback_a) : ledger (ledger_a), write_database_queue (write_database_queue_a), batch_separate_pending_min_time (batch_separate_pending_min_time_a), @@ -13,6 +13,7 @@ logger (logger_a), stopped (stopped_a), original_hash (original_hash_a), notify_observers_callback (notify_observers_callback_a), +notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a), awaiting_processing_size_callback (awaiting_processing_size_callback_a) { } @@ -26,6 +27,7 @@ void nano::confirmation_height_unbounded::process () std::vector receive_source_pairs; release_assert (receive_source_pairs.empty ()); + bool first_iter = true; auto read_transaction (ledger.store.tx_begin_read ()); do @@ -68,6 +70,12 @@ void nano::confirmation_height_unbounded::process () nano::confirmation_height_info confirmation_height_info; release_assert (!ledger.store.confirmation_height_get (read_transaction, account, confirmation_height_info)); confirmation_height = confirmation_height_info.height; + + // This block was added to the confirmation height processor but is already confirmed + if (first_iter && confirmation_height >= block_height && current == original_hash) + { + notify_block_already_cemented_observers_callback (original_hash); + } } auto iterated_height = confirmation_height; if (account_it != confirmed_iterated_pairs.cend () && account_it->second.iterated_height > iterated_height) @@ -141,6 +149,7 @@ void nano::confirmation_height_unbounded::process () } } + first_iter = false; read_transaction.renew (); } while ((!receive_source_pairs.empty () || current != original_hash) && !stopped); } diff --git a/nano/node/confirmation_height_unbounded.hpp b/nano/node/confirmation_height_unbounded.hpp index 7b6620a7..6ce3eefc 100644 --- a/nano/node/confirmation_height_unbounded.hpp +++ b/nano/node/confirmation_height_unbounded.hpp @@ -16,7 +16,7 @@ class write_database_queue; class confirmation_height_unbounded final { public: - confirmation_height_unbounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic &, nano::block_hash const &, std::function const &)> const &, std::function const &); + confirmation_height_unbounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic &, nano::block_hash const &, std::function const &)> const &, std::function const &, std::function const &); bool pending_empty () const; void prepare_new (); void process (); @@ -91,6 +91,7 @@ private: std::atomic & stopped; nano::block_hash const & original_hash; std::function const &)> notify_observers_callback; + std::function notify_block_already_cemented_observers_callback; std::function awaiting_processing_size_callback; friend std::unique_ptr collect_container_info (confirmation_height_unbounded &, const std::string & name_a); diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 488449f5..bda1bdb3 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -37,9 +37,15 @@ void nano::election::confirm_once (nano::election_status_type type_a) auto status_l (status); auto node_l (node.shared ()); auto confirmation_action_l (confirmation_action); - node.active.election_winner_details.emplace (status.winner->hash (), shared_from_this ()); - node.background ([node_l, status_l, confirmation_action_l]() { - node_l->process_confirmed (status_l); + auto this_l = shared_from_this (); + if (status_l.type == nano::election_status_type::active_confirmation_height) + { + // Need to add dependent election results here (and not in process_confirmed which is called asynchronously) so that + // election winner details can be cleared consistently sucessfully in active_transactions::block_cemented_callback + node.active.add_election_winner_details (status_l.winner->hash (), this_l); + } + node.background ([node_l, status_l, confirmation_action_l, this_l]() { + node_l->process_confirmed (status_l, this_l); confirmation_action_l (status_l.winner); }); clear_blocks (); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 71ca3517..675ff886 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1243,26 +1243,31 @@ void nano::node::process_confirmed_data (nano::transaction const & transaction_a } } -void nano::node::process_confirmed (nano::election_status const & status_a, uint8_t iteration) +void nano::node::process_confirmed (nano::election_status const & status_a, std::shared_ptr const & election_a, uint8_t iteration_a) { if (status_a.type == nano::election_status_type::active_confirmed_quorum) { auto block_a (status_a.winner); auto hash (block_a->hash ()); - auto transaction (store.tx_begin_read ()); - if (store.block_get (transaction, hash) != nullptr) + if (ledger.block_exists (block_a->type (), hash)) { + // Pausing to prevent this block being processed before adding to election winner details. + confirmation_height_processor.pause (); confirmation_height_processor.add (hash); + { + active.add_election_winner_details (hash, election_a); + } + confirmation_height_processor.unpause (); } // Limit to 0.5 * 20 = 10 seconds (more than max block_processor::process_batch finish time) - else if (iteration < 20) + else if (iteration_a < 20) { - iteration++; + iteration_a++; std::weak_ptr node_w (shared ()); - alarm.add (std::chrono::steady_clock::now () + network_params.node.process_confirmed_interval, [node_w, status_a, iteration]() { + alarm.add (std::chrono::steady_clock::now () + network_params.node.process_confirmed_interval, [node_w, status_a, iteration_a, election_a]() { if (auto node_l = node_w.lock ()) { - node_l->process_confirmed (status_a, iteration); + node_l->process_confirmed (status_a, election_a, iteration_a); } }); } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 0f232a19..820fdb9c 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -103,7 +103,7 @@ public: int store_version (); void receive_confirmed (nano::transaction const &, std::shared_ptr, nano::block_hash const &); void process_confirmed_data (nano::transaction const &, std::shared_ptr, nano::block_hash const &, nano::block_sideband const &, nano::account &, nano::uint128_t &, bool &, nano::account &); - void process_confirmed (nano::election_status const &, uint8_t = 0); + void process_confirmed (nano::election_status const &, std::shared_ptr const &, uint8_t = 0); void process_active (std::shared_ptr); nano::process_return process (nano::block const &); nano::process_return process_local (std::shared_ptr, bool const = false); diff --git a/nano/secure/ledger.cpp b/nano/secure/ledger.cpp index 18c083c9..5f49b160 100644 --- a/nano/secure/ledger.cpp +++ b/nano/secure/ledger.cpp @@ -780,16 +780,12 @@ nano::block_hash nano::ledger::representative_calculated (nano::transaction cons bool nano::ledger::block_exists (nano::block_hash const & hash_a) { - auto transaction (store.tx_begin_read ()); - auto result (store.block_exists (transaction, hash_a)); - return result; + return store.block_exists (store.tx_begin_read (), hash_a); } bool nano::ledger::block_exists (nano::block_type type, nano::block_hash const & hash_a) { - auto transaction (store.tx_begin_read ()); - auto result (store.block_exists (transaction, type, hash_a)); - return result; + return store.block_exists (store.tx_begin_read (), type, hash_a); } std::string nano::ledger::block_text (char const * hash_a) diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 1f0202b9..08209b52 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -525,6 +525,7 @@ TEST (confirmation_height, many_accounts_single_confirmation) { ASSERT_NO_ERROR (system.poll ()); } + ASSERT_EQ (node->active.election_winner_details_size (), 0); } // Can take up to 10 minutes @@ -586,6 +587,8 @@ TEST (confirmation_height, many_accounts_many_confirmations) { ASSERT_NO_ERROR (system.poll ()); } + + ASSERT_EQ (node->active.election_winner_details_size (), 0); } TEST (confirmation_height, long_chains) @@ -686,6 +689,7 @@ TEST (confirmation_height, long_chains) { ASSERT_NO_ERROR (system.poll ()); } + ASSERT_EQ (node->active.election_winner_details_size (), 0); } TEST (confirmation_height, dynamic_algorithm) @@ -732,6 +736,7 @@ TEST (confirmation_height, dynamic_algorithm) ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), num_blocks); ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_bounded, nano::stat::dir::in), 1); ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in), num_blocks - 1); + ASSERT_EQ (node->active.election_winner_details_size (), 0); } namespace nano