Use a multi-index container to allow fifo queue for pending confirmations (#2730)

* Use a multi-index container to allow fifo queue for pending confirmations

* Use mi::identity (thanks Gui!)

* Missed variable update

* Can use count instead of iterators
This commit is contained in:
Wesley Shillingford 2020-04-24 10:20:21 +01:00 committed by GitHub
commit cd6c61b787
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 32 deletions

View file

@ -15,8 +15,8 @@ nano::confirmation_height_processor::confirmation_height_processor (nano::ledger
ledger (ledger_a),
write_database_queue (write_database_queue_a),
// clang-format off
confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
// clang-format on
thread ([this, &latch, mode_a]() {
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
@ -53,7 +53,7 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
if (!paused && !awaiting_processing.empty ())
{
lk.unlock ();
if (confirmation_height_bounded_processor.pending_empty () && confirmation_height_unbounded_processor.pending_empty ())
if (bounded_processor.pending_empty () && unbounded_processor.pending_empty ())
{
lk.lock ();
original_hashes_pending.clear ();
@ -66,26 +66,26 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
auto blocks_within_automatic_unbounded_selection = (ledger.cache.block_count < num_blocks_to_use_unbounded || ledger.cache.block_count - num_blocks_to_use_unbounded < ledger.cache.cemented_count);
// Don't want to mix up pending writes across different processors
auto valid_unbounded = (mode_a == confirmation_height_mode::automatic && blocks_within_automatic_unbounded_selection && confirmation_height_bounded_processor.pending_empty ());
auto force_unbounded = (!confirmation_height_unbounded_processor.pending_empty () || mode_a == confirmation_height_mode::unbounded);
auto valid_unbounded = (mode_a == confirmation_height_mode::automatic && blocks_within_automatic_unbounded_selection && bounded_processor.pending_empty ());
auto force_unbounded = (!unbounded_processor.pending_empty () || mode_a == confirmation_height_mode::unbounded);
if (force_unbounded || valid_unbounded)
{
debug_assert (confirmation_height_bounded_processor.pending_empty ());
if (confirmation_height_unbounded_processor.pending_empty ())
debug_assert (bounded_processor.pending_empty ());
if (unbounded_processor.pending_empty ())
{
confirmation_height_unbounded_processor.reset ();
unbounded_processor.reset ();
}
confirmation_height_unbounded_processor.process ();
unbounded_processor.process ();
}
else
{
debug_assert (mode_a == confirmation_height_mode::bounded || mode_a == confirmation_height_mode::automatic);
debug_assert (confirmation_height_unbounded_processor.pending_empty ());
if (confirmation_height_bounded_processor.pending_empty ())
debug_assert (unbounded_processor.pending_empty ());
if (bounded_processor.pending_empty ())
{
confirmation_height_bounded_processor.reset ();
bounded_processor.reset ();
}
confirmation_height_bounded_processor.process ();
bounded_processor.process ();
}
lk.lock ();
@ -103,25 +103,25 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
lk.unlock ();
// If there are blocks pending cementing, then make sure we flush out the remaining writes
if (!confirmation_height_bounded_processor.pending_empty ())
if (!bounded_processor.pending_empty ())
{
debug_assert (confirmation_height_unbounded_processor.pending_empty ());
debug_assert (unbounded_processor.pending_empty ());
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_bounded_processor.cement_blocks (scoped_write_guard);
bounded_processor.cement_blocks (scoped_write_guard);
}
lock_and_cleanup ();
confirmation_height_bounded_processor.reset ();
bounded_processor.reset ();
}
else if (!confirmation_height_unbounded_processor.pending_empty ())
else if (!unbounded_processor.pending_empty ())
{
debug_assert (confirmation_height_bounded_processor.pending_empty ());
debug_assert (bounded_processor.pending_empty ());
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_unbounded_processor.cement_blocks (scoped_write_guard);
unbounded_processor.cement_blocks (scoped_write_guard);
}
lock_and_cleanup ();
confirmation_height_unbounded_processor.reset ();
unbounded_processor.reset ();
}
else
{
@ -158,7 +158,7 @@ void nano::confirmation_height_processor::add (nano::block_hash const & hash_a)
{
{
nano::lock_guard<std::mutex> lk (mutex);
awaiting_processing.insert (hash_a);
awaiting_processing.get<tag_sequence> ().emplace_back (hash_a);
}
condition.notify_one ();
}
@ -167,9 +167,9 @@ void nano::confirmation_height_processor::set_next_hash ()
{
nano::lock_guard<std::mutex> guard (mutex);
debug_assert (!awaiting_processing.empty ());
original_hash = *awaiting_processing.begin ();
original_hash = awaiting_processing.get<tag_sequence> ().front ();
original_hashes_pending.insert (original_hash);
awaiting_processing.erase (original_hash);
awaiting_processing.get<tag_sequence> ().pop_front ();
}
// Not thread-safe, only call before this processor has begun cementing
@ -212,8 +212,8 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (co
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "cemented_observers", cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.cemented_observers)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "block_already_cemented_observers", block_already_cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.block_already_cemented_observers)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "awaiting_processing", confirmation_height_processor_a.awaiting_processing_size (), sizeof (decltype (confirmation_height_processor_a.awaiting_processing)::value_type) }));
composite->add_component (collect_container_info (confirmation_height_processor_a.confirmation_height_bounded_processor, "bounded_processor"));
composite->add_component (collect_container_info (confirmation_height_processor_a.confirmation_height_unbounded_processor, "unbounded_processor"));
composite->add_component (collect_container_info (confirmation_height_processor_a.bounded_processor, "bounded_processor"));
composite->add_component (collect_container_info (confirmation_height_processor_a.unbounded_processor, "unbounded_processor"));
return composite;
}
@ -226,7 +226,7 @@ size_t nano::confirmation_height_processor::awaiting_processing_size ()
bool nano::confirmation_height_processor::is_processing_block (nano::block_hash const & hash_a)
{
nano::lock_guard<std::mutex> guard (mutex);
return original_hashes_pending.find (hash_a) != original_hashes_pending.cend () || awaiting_processing.find (hash_a) != awaiting_processing.cend ();
return original_hashes_pending.count (hash_a) > 0 || awaiting_processing.get<tag_hash> ().count (hash_a) > 0;
}
nano::block_hash nano::confirmation_height_processor::current ()

View file

@ -6,11 +6,17 @@
#include <nano/secure/blockstore.hpp>
#include <nano/secure/common.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <unordered_set>
namespace mi = boost::multi_index;
namespace boost
{
class latch;
@ -41,7 +47,16 @@ public:
private:
std::mutex mutex;
// Hashes which have been added to the confirmation height processor, but not yet processed
std::unordered_set<nano::block_hash> awaiting_processing;
// clang-format off
class tag_sequence {};
class tag_hash {};
boost::multi_index_container<nano::block_hash,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequence>>,
mi::hashed_unique<mi::tag<tag_hash>,
mi::identity<nano::block_hash>>>> awaiting_processing;
// clang-format on
// Hashes which have been added and processed, but have not been cemented
std::unordered_set<nano::block_hash> original_hashes_pending;
bool paused{ false };
@ -58,10 +73,10 @@ private:
nano::ledger & ledger;
nano::write_database_queue & write_database_queue;
/** The maximum amount of blocks to write at once. This is dynamically modified by the bounded processor based on previous write performance **/
uint64_t batch_write_size{ 65536 };
uint64_t batch_write_size{ 32768 };
confirmation_height_unbounded confirmation_height_unbounded_processor;
confirmation_height_bounded confirmation_height_bounded_processor;
confirmation_height_unbounded unbounded_processor;
confirmation_height_bounded bounded_processor;
std::thread thread;
void set_next_hash ();

View file

@ -828,7 +828,7 @@ TEST (confirmation_height, dynamic_algorithm_no_transition_while_pending)
node->confirmation_height_processor.pause ();
timer.restart ();
while (node->confirmation_height_processor.confirmation_height_unbounded_processor.pending_writes_size == 0)
while (node->confirmation_height_processor.unbounded_processor.pending_writes_size == 0)
{
ASSERT_NO_ERROR (system.poll ());
}