diff --git a/nano/node/confirmation_height_processor.cpp b/nano/node/confirmation_height_processor.cpp index 4b96410a..bf6e4cf2 100644 --- a/nano/node/confirmation_height_processor.cpp +++ b/nano/node/confirmation_height_processor.cpp @@ -15,8 +15,8 @@ nano::confirmation_height_processor::confirmation_height_processor (nano::ledger ledger (ledger_a), write_database_queue (write_database_queue_a), // clang-format off -confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }), -confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }), +unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }), +bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }), // clang-format on thread ([this, &latch, mode_a]() { nano::thread_role::set (nano::thread_role::name::confirmation_height_processing); @@ -53,7 +53,7 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a) if (!paused && !awaiting_processing.empty ()) { lk.unlock (); - if (confirmation_height_bounded_processor.pending_empty () && confirmation_height_unbounded_processor.pending_empty ()) + if (bounded_processor.pending_empty () && unbounded_processor.pending_empty ()) { lk.lock (); original_hashes_pending.clear (); @@ -66,26 +66,26 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a) auto blocks_within_automatic_unbounded_selection = (ledger.cache.block_count < num_blocks_to_use_unbounded || ledger.cache.block_count - num_blocks_to_use_unbounded < ledger.cache.cemented_count); // Don't want to mix up pending writes across different processors - auto valid_unbounded = (mode_a == confirmation_height_mode::automatic && blocks_within_automatic_unbounded_selection && confirmation_height_bounded_processor.pending_empty ()); - auto force_unbounded = (!confirmation_height_unbounded_processor.pending_empty () || mode_a == confirmation_height_mode::unbounded); + auto valid_unbounded = (mode_a == confirmation_height_mode::automatic && blocks_within_automatic_unbounded_selection && bounded_processor.pending_empty ()); + auto force_unbounded = (!unbounded_processor.pending_empty () || mode_a == confirmation_height_mode::unbounded); if (force_unbounded || valid_unbounded) { - debug_assert (confirmation_height_bounded_processor.pending_empty ()); - if (confirmation_height_unbounded_processor.pending_empty ()) + debug_assert (bounded_processor.pending_empty ()); + if (unbounded_processor.pending_empty ()) { - confirmation_height_unbounded_processor.reset (); + unbounded_processor.reset (); } - confirmation_height_unbounded_processor.process (); + unbounded_processor.process (); } else { debug_assert (mode_a == confirmation_height_mode::bounded || mode_a == confirmation_height_mode::automatic); - debug_assert (confirmation_height_unbounded_processor.pending_empty ()); - if (confirmation_height_bounded_processor.pending_empty ()) + debug_assert (unbounded_processor.pending_empty ()); + if (bounded_processor.pending_empty ()) { - confirmation_height_bounded_processor.reset (); + bounded_processor.reset (); } - confirmation_height_bounded_processor.process (); + bounded_processor.process (); } lk.lock (); @@ -103,25 +103,25 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a) lk.unlock (); // If there are blocks pending cementing, then make sure we flush out the remaining writes - if (!confirmation_height_bounded_processor.pending_empty ()) + if (!bounded_processor.pending_empty ()) { - debug_assert (confirmation_height_unbounded_processor.pending_empty ()); + debug_assert (unbounded_processor.pending_empty ()); { auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height); - confirmation_height_bounded_processor.cement_blocks (scoped_write_guard); + bounded_processor.cement_blocks (scoped_write_guard); } lock_and_cleanup (); - confirmation_height_bounded_processor.reset (); + bounded_processor.reset (); } - else if (!confirmation_height_unbounded_processor.pending_empty ()) + else if (!unbounded_processor.pending_empty ()) { - debug_assert (confirmation_height_bounded_processor.pending_empty ()); + debug_assert (bounded_processor.pending_empty ()); { auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height); - confirmation_height_unbounded_processor.cement_blocks (scoped_write_guard); + unbounded_processor.cement_blocks (scoped_write_guard); } lock_and_cleanup (); - confirmation_height_unbounded_processor.reset (); + unbounded_processor.reset (); } else { @@ -158,7 +158,7 @@ void nano::confirmation_height_processor::add (nano::block_hash const & hash_a) { { nano::lock_guard lk (mutex); - awaiting_processing.insert (hash_a); + awaiting_processing.get ().emplace_back (hash_a); } condition.notify_one (); } @@ -167,9 +167,9 @@ void nano::confirmation_height_processor::set_next_hash () { nano::lock_guard guard (mutex); debug_assert (!awaiting_processing.empty ()); - original_hash = *awaiting_processing.begin (); + original_hash = awaiting_processing.get ().front (); original_hashes_pending.insert (original_hash); - awaiting_processing.erase (original_hash); + awaiting_processing.get ().pop_front (); } // Not thread-safe, only call before this processor has begun cementing @@ -212,8 +212,8 @@ std::unique_ptr nano::collect_container_info (co composite->add_component (std::make_unique (container_info{ "cemented_observers", cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.cemented_observers)::value_type) })); composite->add_component (std::make_unique (container_info{ "block_already_cemented_observers", block_already_cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.block_already_cemented_observers)::value_type) })); composite->add_component (std::make_unique (container_info{ "awaiting_processing", confirmation_height_processor_a.awaiting_processing_size (), sizeof (decltype (confirmation_height_processor_a.awaiting_processing)::value_type) })); - composite->add_component (collect_container_info (confirmation_height_processor_a.confirmation_height_bounded_processor, "bounded_processor")); - composite->add_component (collect_container_info (confirmation_height_processor_a.confirmation_height_unbounded_processor, "unbounded_processor")); + composite->add_component (collect_container_info (confirmation_height_processor_a.bounded_processor, "bounded_processor")); + composite->add_component (collect_container_info (confirmation_height_processor_a.unbounded_processor, "unbounded_processor")); return composite; } @@ -226,7 +226,7 @@ size_t nano::confirmation_height_processor::awaiting_processing_size () bool nano::confirmation_height_processor::is_processing_block (nano::block_hash const & hash_a) { nano::lock_guard guard (mutex); - return original_hashes_pending.find (hash_a) != original_hashes_pending.cend () || awaiting_processing.find (hash_a) != awaiting_processing.cend (); + return original_hashes_pending.count (hash_a) > 0 || awaiting_processing.get ().count (hash_a) > 0; } nano::block_hash nano::confirmation_height_processor::current () diff --git a/nano/node/confirmation_height_processor.hpp b/nano/node/confirmation_height_processor.hpp index d7e217e8..3d2a1dd6 100644 --- a/nano/node/confirmation_height_processor.hpp +++ b/nano/node/confirmation_height_processor.hpp @@ -6,11 +6,17 @@ #include #include +#include +#include +#include +#include + #include #include #include #include +namespace mi = boost::multi_index; namespace boost { class latch; @@ -41,7 +47,16 @@ public: private: std::mutex mutex; // Hashes which have been added to the confirmation height processor, but not yet processed - std::unordered_set awaiting_processing; + // clang-format off + class tag_sequence {}; + class tag_hash {}; + boost::multi_index_container>, + mi::hashed_unique, + mi::identity>>> awaiting_processing; + // clang-format on + // Hashes which have been added and processed, but have not been cemented std::unordered_set original_hashes_pending; bool paused{ false }; @@ -58,10 +73,10 @@ private: nano::ledger & ledger; nano::write_database_queue & write_database_queue; /** The maximum amount of blocks to write at once. This is dynamically modified by the bounded processor based on previous write performance **/ - uint64_t batch_write_size{ 65536 }; + uint64_t batch_write_size{ 32768 }; - confirmation_height_unbounded confirmation_height_unbounded_processor; - confirmation_height_bounded confirmation_height_bounded_processor; + confirmation_height_unbounded unbounded_processor; + confirmation_height_bounded bounded_processor; std::thread thread; void set_next_hash (); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 00d3bdfd..4c69f300 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -828,7 +828,7 @@ TEST (confirmation_height, dynamic_algorithm_no_transition_while_pending) node->confirmation_height_processor.pause (); timer.restart (); - while (node->confirmation_height_processor.confirmation_height_unbounded_processor.pending_writes_size == 0) + while (node->confirmation_height_processor.unbounded_processor.pending_writes_size == 0) { ASSERT_NO_ERROR (system.poll ()); }