Release write_guard lock when no longer required (#2716)
* Release write_guard lock when no longer required * Use move constructor for write_guard * Increase maximum values for various settings * Formatting * Remove unnecessary extra space in comment * Update slow_tests * Fix slow_test check * Dynamically set batch_write_size based on previous write performance. Change is gradual to account for random spikes/slowdowns. * Add a tolerance in case amount to cement is just above to save waiting on block processor for small amount of blocks * Reduce batch_write_size in slow_tests now that it's configurable so that it takes less time * Don't call release if there's no blocks which were cemented at the end * Typo in comment (thanks Gui) * Prevent yoyoing as much * (Unrelated) Fix prioritize_frontiers_overwrite test * Increase amount of time spent searching for frontiers when there is a low amount of active transactions * Have a force_write in unbounded to be consistent with bounded which is based on blocks * Typo in comment * Modify heuristics for updating active multiplier (Gui comment) * Give magic number a variable (gui) * Fix incorrect comparison (Gui) * Add public function to determine if write_guard is owned and use that (Gui)
This commit is contained in:
parent
95bfae4fae
commit
996d903a05
11 changed files with 196 additions and 81 deletions
|
@ -275,7 +275,9 @@ void nano::active_transactions::frontiers_confirmation (nano::unique_lock<std::m
|
|||
{
|
||||
// Spend some time prioritizing accounts with the most uncemented blocks to reduce voting traffic
|
||||
auto request_interval = std::chrono::milliseconds (node.network_params.network.request_interval_ms);
|
||||
auto time_spent_prioritizing_ledger_accounts = request_interval / 100;
|
||||
// Spend longer searching ledger accounts when there is a low amount of elections going on
|
||||
auto low_active = roots.size () < 1000;
|
||||
auto time_spent_prioritizing_ledger_accounts = request_interval / (low_active ? 20 : 100);
|
||||
auto time_spent_prioritizing_wallet_accounts = request_interval / 250;
|
||||
lock_a.unlock ();
|
||||
auto transaction = node.store.tx_begin_read ();
|
||||
|
@ -800,7 +802,7 @@ void nano::active_transactions::update_active_multiplier (nano::unique_lock<std:
|
|||
last_prioritized_multiplier.reset ();
|
||||
double multiplier (1.);
|
||||
// Heurestic to filter out non-saturated network and frontier confirmation
|
||||
if (roots.size () > prioritized_cutoff / 2 || (node.network_params.network.is_test_network () && !roots.empty ()))
|
||||
if (roots.size () >= prioritized_cutoff || (node.network_params.network.is_test_network () && !roots.empty ()))
|
||||
{
|
||||
auto & sorted_roots = roots.get<tag_difficulty> ();
|
||||
std::vector<double> prioritized;
|
||||
|
|
|
@ -8,13 +8,14 @@
|
|||
|
||||
#include <numeric>
|
||||
|
||||
nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
|
||||
nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, uint64_t & batch_write_size_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
|
||||
ledger (ledger_a),
|
||||
write_database_queue (write_database_queue_a),
|
||||
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
|
||||
logger (logger_a),
|
||||
stopped (stopped_a),
|
||||
original_hash (original_hash_a),
|
||||
batch_write_size (batch_write_size_a),
|
||||
notify_observers_callback (notify_observers_callback_a),
|
||||
notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a),
|
||||
awaiting_processing_size_callback (awaiting_processing_size_callback_a)
|
||||
|
@ -156,7 +157,7 @@ void nano::confirmation_height_bounded::process ()
|
|||
return total += write_details_a.top_height - write_details_a.bottom_height + 1;
|
||||
});
|
||||
|
||||
auto max_batch_write_size_reached = (total_pending_write_block_count >= confirmation_height::batch_write_size);
|
||||
auto max_batch_write_size_reached = (total_pending_write_block_count >= batch_write_size);
|
||||
// When there are a lot of pending confirmation height blocks, it is more efficient to
|
||||
// bulk some of them up to enable better write performance which becomes the bottleneck.
|
||||
auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time);
|
||||
|
@ -165,19 +166,19 @@ void nano::confirmation_height_bounded::process ()
|
|||
auto should_output = finished_iterating && (non_awaiting_processing || min_time_exceeded);
|
||||
auto force_write = pending_writes.size () >= pending_writes_max_size || accounts_confirmed_info.size () >= pending_writes_max_size;
|
||||
|
||||
if (((max_batch_write_size_reached || should_output) && !pending_writes.empty ()) || force_write)
|
||||
if ((max_batch_write_size_reached || should_output || force_write) && !pending_writes.empty ())
|
||||
{
|
||||
bool error = false;
|
||||
// If nothing is currently using the database write lock then write the cemented pending blocks otherwise continue iterating
|
||||
if (write_database_queue.process (nano::writer::confirmation_height))
|
||||
{
|
||||
auto scoped_write_guard = write_database_queue.pop ();
|
||||
error = cement_blocks ();
|
||||
error = cement_blocks (scoped_write_guard);
|
||||
}
|
||||
else if (force_write)
|
||||
{
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
error = cement_blocks ();
|
||||
error = cement_blocks (scoped_write_guard);
|
||||
}
|
||||
// Don't set any more cemented blocks from the original hash if an inconsistency is found
|
||||
if (error)
|
||||
|
@ -332,7 +333,7 @@ void nano::confirmation_height_bounded::prepare_iterated_blocks_for_cementing (p
|
|||
}
|
||||
}
|
||||
|
||||
bool nano::confirmation_height_bounded::cement_blocks ()
|
||||
bool nano::confirmation_height_bounded::cement_blocks (nano::write_guard & scoped_write_guard_a)
|
||||
{
|
||||
// Will contain all blocks that have been cemented (bounded by batch_write_size)
|
||||
// and will get run through the cemented observer callback
|
||||
|
@ -340,7 +341,8 @@ bool nano::confirmation_height_bounded::cement_blocks ()
|
|||
{
|
||||
// This only writes to the confirmation_height table and is the only place to do so in a single process
|
||||
auto transaction (ledger.store.tx_begin_write ({}, { nano::tables::confirmation_height }));
|
||||
|
||||
nano::timer<> timer;
|
||||
timer.start ();
|
||||
// Cement all pending entries, each entry is specific to an account and contains the least amount
|
||||
// of blocks to retain consistent cementing across all account chains to genesis.
|
||||
while (!pending_writes.empty ())
|
||||
|
@ -407,22 +409,52 @@ bool nano::confirmation_height_bounded::cement_blocks ()
|
|||
return true;
|
||||
}
|
||||
|
||||
auto last_iteration = (num_blocks_confirmed - num_blocks_iterated) == 1;
|
||||
|
||||
cemented_blocks.emplace_back (block);
|
||||
|
||||
// We have likely hit a long chain, flush these callbacks and continue
|
||||
if (cemented_blocks.size () == confirmation_height::batch_write_size)
|
||||
// Flush these callbacks and continue as we write in batches (ideally maximum 250ms) to not hold write db transaction for too long.
|
||||
// Include a tolerance to save having to potentially wait on the block processor if the number of blocks to cement is only a bit higher than the max.
|
||||
if (cemented_blocks.size () > batch_write_size + (batch_write_size / 10))
|
||||
{
|
||||
auto num_blocks_cemented = num_blocks_iterated - total_blocks_cemented + 1;
|
||||
total_blocks_cemented += num_blocks_cemented;
|
||||
write_confirmation_height (num_blocks_cemented, start_height + total_blocks_cemented - 1, new_cemented_frontier);
|
||||
transaction.commit ();
|
||||
|
||||
// Update the maximum amount of blocks to write next time based on the time it took to cement this batch.
|
||||
if (!network_params.network.is_test_network ())
|
||||
{
|
||||
auto const amount_to_change = batch_write_size / 10; // 10%
|
||||
auto const maximum_batch_write_time = 250; // milliseconds
|
||||
auto const maximum_batch_write_time_increase_cutoff = maximum_batch_write_time - (maximum_batch_write_time / 5);
|
||||
if (timer.since_start ().count () > maximum_batch_write_time)
|
||||
{
|
||||
// Reduce (unless we have hit a floor)
|
||||
auto const minimum_batch_size = 16384u;
|
||||
batch_write_size = std::max<uint64_t> (minimum_batch_size, batch_write_size - amount_to_change);
|
||||
}
|
||||
else if (timer.since_start ().count () < maximum_batch_write_time_increase_cutoff)
|
||||
{
|
||||
// Increase amount of blocks written for next batch if the time for writing this one is sufficiently lower than the max time to warrant changing
|
||||
batch_write_size += amount_to_change;
|
||||
}
|
||||
}
|
||||
|
||||
scoped_write_guard_a.release ();
|
||||
notify_observers_callback (cemented_blocks);
|
||||
cemented_blocks.clear ();
|
||||
transaction.renew ();
|
||||
|
||||
// Only aquire transaction if there are any blocks left
|
||||
if (!(last_iteration && pending_writes.size () == 1))
|
||||
{
|
||||
scoped_write_guard_a = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
transaction.renew ();
|
||||
timer.restart ();
|
||||
}
|
||||
}
|
||||
|
||||
// Get the next block in the chain until we have reached the final desired one
|
||||
auto last_iteration = (num_blocks_confirmed - num_blocks_iterated) == 1;
|
||||
if (!last_iteration)
|
||||
{
|
||||
new_cemented_frontier = block->sideband ().successor;
|
||||
|
@ -436,7 +468,10 @@ bool nano::confirmation_height_bounded::cement_blocks ()
|
|||
}
|
||||
|
||||
auto num_blocks_cemented = num_blocks_confirmed - total_blocks_cemented;
|
||||
write_confirmation_height (num_blocks_cemented, pending.top_height, new_cemented_frontier);
|
||||
if (num_blocks_cemented > 0)
|
||||
{
|
||||
write_confirmation_height (num_blocks_cemented, pending.top_height, new_cemented_frontier);
|
||||
}
|
||||
}
|
||||
|
||||
auto it = accounts_confirmed_info.find (pending.account);
|
||||
|
@ -450,7 +485,12 @@ bool nano::confirmation_height_bounded::cement_blocks ()
|
|||
}
|
||||
}
|
||||
|
||||
notify_observers_callback (cemented_blocks);
|
||||
// Scope guard could have been released earlier (0 cemented_blocks would indicate that)
|
||||
if (scoped_write_guard_a.is_owned () && !cemented_blocks.empty ())
|
||||
{
|
||||
scoped_write_guard_a.release ();
|
||||
notify_observers_callback (cemented_blocks);
|
||||
}
|
||||
|
||||
debug_assert (pending_writes.empty ());
|
||||
debug_assert (pending_writes_size == 0);
|
||||
|
|
|
@ -12,15 +12,16 @@ class ledger;
|
|||
class read_transaction;
|
||||
class logger_mt;
|
||||
class write_database_queue;
|
||||
class write_guard;
|
||||
|
||||
class confirmation_height_bounded final
|
||||
{
|
||||
public:
|
||||
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
|
||||
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, uint64_t &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
|
||||
bool pending_empty () const;
|
||||
void prepare_new ();
|
||||
void process ();
|
||||
bool cement_blocks ();
|
||||
bool cement_blocks (nano::write_guard & scoped_write_guard_a);
|
||||
|
||||
private:
|
||||
class top_and_next_hash final
|
||||
|
@ -53,9 +54,10 @@ private:
|
|||
};
|
||||
|
||||
/** The maximum number of blocks to be read in while iterating over a long account chain */
|
||||
static uint64_t constexpr batch_read_size = 4096;
|
||||
uint64_t const batch_read_size = 65536;
|
||||
|
||||
static uint32_t constexpr max_items{ 65536 };
|
||||
/** The maximum number of various containers to keep the memory bounded */
|
||||
uint32_t const max_items{ 131072 };
|
||||
|
||||
// All of the atomic variables here just track the size for use in collect_container_info.
|
||||
// This is so that no mutexes are needed during the algorithm itself, which would otherwise be needed
|
||||
|
@ -64,7 +66,7 @@ private:
|
|||
// This allows the load and stores to use relaxed atomic memory ordering.
|
||||
std::deque<write_details> pending_writes;
|
||||
nano::relaxed_atomic_integral<uint64_t> pending_writes_size{ 0 };
|
||||
static uint32_t constexpr pending_writes_max_size{ max_items };
|
||||
uint32_t const pending_writes_max_size{ max_items };
|
||||
/* Holds confirmation height/cemented frontier in memory for accounts while iterating */
|
||||
std::unordered_map<account, confirmed_info> accounts_confirmed_info;
|
||||
nano::relaxed_atomic_integral<uint64_t> accounts_confirmed_info_size{ 0 };
|
||||
|
@ -120,9 +122,11 @@ private:
|
|||
nano::logger_mt & logger;
|
||||
std::atomic<bool> & stopped;
|
||||
nano::block_hash const & original_hash;
|
||||
uint64_t & batch_write_size;
|
||||
std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> notify_observers_callback;
|
||||
std::function<void(nano::block_hash const &)> notify_block_already_cemented_observers_callback;
|
||||
std::function<uint64_t ()> awaiting_processing_size_callback;
|
||||
nano::network_params network_params;
|
||||
|
||||
friend std::unique_ptr<nano::container_info_component> collect_container_info (confirmation_height_bounded &, const std::string & name_a);
|
||||
};
|
||||
|
|
|
@ -15,8 +15,8 @@ nano::confirmation_height_processor::confirmation_height_processor (nano::ledger
|
|||
ledger (ledger_a),
|
||||
write_database_queue (write_database_queue_a),
|
||||
// clang-format off
|
||||
confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
|
||||
confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
|
||||
confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
|
||||
confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
|
||||
// clang-format on
|
||||
thread ([this, &latch, mode_a]() {
|
||||
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
|
||||
|
@ -106,15 +106,19 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
|
|||
if (!confirmation_height_bounded_processor.pending_empty ())
|
||||
{
|
||||
debug_assert (confirmation_height_unbounded_processor.pending_empty ());
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
confirmation_height_bounded_processor.cement_blocks ();
|
||||
{
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
confirmation_height_bounded_processor.cement_blocks (scoped_write_guard);
|
||||
}
|
||||
lock_and_cleanup ();
|
||||
}
|
||||
else if (!confirmation_height_unbounded_processor.pending_empty ())
|
||||
{
|
||||
debug_assert (confirmation_height_bounded_processor.pending_empty ());
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
confirmation_height_unbounded_processor.cement_blocks ();
|
||||
{
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
confirmation_height_unbounded_processor.cement_blocks (scoped_write_guard);
|
||||
}
|
||||
lock_and_cleanup ();
|
||||
}
|
||||
else
|
||||
|
|
|
@ -57,6 +57,9 @@ private:
|
|||
|
||||
nano::ledger & ledger;
|
||||
nano::write_database_queue & write_database_queue;
|
||||
/** The maximum amount of blocks to write at once. This is dynamically modified by the bounded processor based on previous write performance **/
|
||||
uint64_t batch_write_size{ 65536 };
|
||||
|
||||
confirmation_height_unbounded confirmation_height_unbounded_processor;
|
||||
confirmation_height_bounded confirmation_height_bounded_processor;
|
||||
std::thread thread;
|
||||
|
@ -70,6 +73,9 @@ private:
|
|||
friend class confirmation_height_dependent_election_Test;
|
||||
friend class confirmation_height_dependent_election_after_already_cemented_Test;
|
||||
friend class confirmation_height_dynamic_algorithm_no_transition_while_pending_Test;
|
||||
friend class confirmation_height_many_accounts_many_confirmations_Test;
|
||||
friend class confirmation_height_long_chains_Test;
|
||||
friend class confirmation_height_many_accounts_single_confirmation_Test;
|
||||
};
|
||||
|
||||
std::unique_ptr<container_info_component> collect_container_info (confirmation_height_processor &, const std::string &);
|
||||
|
|
|
@ -5,13 +5,14 @@
|
|||
|
||||
#include <numeric>
|
||||
|
||||
nano::confirmation_height_unbounded::confirmation_height_unbounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
|
||||
nano::confirmation_height_unbounded::confirmation_height_unbounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, uint64_t & batch_write_size_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
|
||||
ledger (ledger_a),
|
||||
write_database_queue (write_database_queue_a),
|
||||
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
|
||||
logger (logger_a),
|
||||
stopped (stopped_a),
|
||||
original_hash (original_hash_a),
|
||||
batch_write_size (batch_write_size_a),
|
||||
notify_observers_callback (notify_observers_callback_a),
|
||||
notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a),
|
||||
awaiting_processing_size_callback (awaiting_processing_size_callback_a)
|
||||
|
@ -127,7 +128,7 @@ void nano::confirmation_height_unbounded::process ()
|
|||
}
|
||||
}
|
||||
|
||||
auto max_write_size_reached = (pending_writes.size () >= confirmation_height::batch_write_size);
|
||||
auto max_write_size_reached = (pending_writes.size () >= confirmation_height::unbounded_cutoff);
|
||||
// When there are a lot of pending confirmation height blocks, it is more efficient to
|
||||
// bulk some of them up to enable better write performance which becomes the bottleneck.
|
||||
auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time);
|
||||
|
@ -135,17 +136,30 @@ void nano::confirmation_height_unbounded::process ()
|
|||
auto no_pending = awaiting_processing_size_callback () == 0;
|
||||
auto should_output = finished_iterating && (no_pending || min_time_exceeded);
|
||||
|
||||
if ((max_write_size_reached || should_output) && !pending_writes.empty ())
|
||||
auto total_pending_write_block_count = std::accumulate (pending_writes.cbegin (), pending_writes.cend (), uint64_t (0), [](uint64_t total, conf_height_details const & receive_details_a) {
|
||||
return total += receive_details_a.num_blocks_confirmed;
|
||||
});
|
||||
auto force_write = total_pending_write_block_count > batch_write_size;
|
||||
|
||||
if ((max_write_size_reached || should_output || force_write) && !pending_writes.empty ())
|
||||
{
|
||||
bool error = false;
|
||||
if (write_database_queue.process (nano::writer::confirmation_height))
|
||||
{
|
||||
auto scoped_write_guard = write_database_queue.pop ();
|
||||
auto error = cement_blocks ();
|
||||
// Don't set any more blocks as confirmed from the original hash if an inconsistency is found
|
||||
if (error)
|
||||
{
|
||||
break;
|
||||
}
|
||||
error = cement_blocks (scoped_write_guard);
|
||||
}
|
||||
else if (force_write)
|
||||
{
|
||||
// Unbounded processor has grown too large, force a write
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
error = cement_blocks (scoped_write_guard);
|
||||
}
|
||||
|
||||
// Don't set any more cemented blocks from the original hash if an inconsistency is found
|
||||
if (error)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -315,12 +329,8 @@ void nano::confirmation_height_unbounded::prepare_iterated_blocks_for_cementing
|
|||
/*
|
||||
* Returns true if there was an error in finding one of the blocks to write a confirmation height for, false otherwise
|
||||
*/
|
||||
bool nano::confirmation_height_unbounded::cement_blocks ()
|
||||
bool nano::confirmation_height_unbounded::cement_blocks (nano::write_guard & scoped_write_guard_a)
|
||||
{
|
||||
auto total_pending_write_block_count = std::accumulate (pending_writes.cbegin (), pending_writes.cend (), uint64_t (0), [](uint64_t total, conf_height_details const & receive_details_a) {
|
||||
return total += receive_details_a.num_blocks_confirmed;
|
||||
});
|
||||
|
||||
std::vector<std::shared_ptr<nano::block>> cemented_blocks;
|
||||
{
|
||||
auto transaction (ledger.store.tx_begin_write ({}, { nano::tables::confirmation_height }));
|
||||
|
@ -364,15 +374,13 @@ bool nano::confirmation_height_unbounded::cement_blocks ()
|
|||
return block_cache.at (hash_a);
|
||||
});
|
||||
}
|
||||
total_pending_write_block_count -= pending.num_blocks_confirmed;
|
||||
pending_writes.erase (pending_writes.begin ());
|
||||
--pending_writes_size;
|
||||
}
|
||||
}
|
||||
|
||||
scoped_write_guard_a.release ();
|
||||
notify_observers_callback (cemented_blocks);
|
||||
|
||||
debug_assert (total_pending_write_block_count == 0);
|
||||
debug_assert (pending_writes.empty ());
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -13,15 +13,16 @@ class ledger;
|
|||
class read_transaction;
|
||||
class logger_mt;
|
||||
class write_database_queue;
|
||||
class write_guard;
|
||||
|
||||
class confirmation_height_unbounded final
|
||||
{
|
||||
public:
|
||||
confirmation_height_unbounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
|
||||
confirmation_height_unbounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, uint64_t &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
|
||||
bool pending_empty () const;
|
||||
void prepare_new ();
|
||||
void process ();
|
||||
bool cement_blocks ();
|
||||
bool cement_blocks (nano::write_guard &);
|
||||
|
||||
private:
|
||||
class confirmed_iterated_pair
|
||||
|
@ -96,6 +97,8 @@ private:
|
|||
nano::logger_mt & logger;
|
||||
std::atomic<bool> & stopped;
|
||||
nano::block_hash const & original_hash;
|
||||
uint64_t & batch_write_size;
|
||||
|
||||
std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> notify_observers_callback;
|
||||
std::function<void(nano::block_hash const &)> notify_block_already_cemented_observers_callback;
|
||||
std::function<uint64_t ()> awaiting_processing_size_callback;
|
||||
|
|
|
@ -3,22 +3,59 @@
|
|||
|
||||
#include <algorithm>
|
||||
|
||||
nano::write_guard::write_guard (nano::condition_variable & cv_a, std::function<void()> guard_finish_callback_a) :
|
||||
cv (cv_a),
|
||||
nano::write_guard::write_guard (std::function<void()> guard_finish_callback_a) :
|
||||
guard_finish_callback (guard_finish_callback_a)
|
||||
{
|
||||
}
|
||||
|
||||
nano::write_guard::write_guard (nano::write_guard && write_guard_a) noexcept :
|
||||
owns (write_guard_a.owns),
|
||||
guard_finish_callback (std::move (write_guard_a.guard_finish_callback))
|
||||
{
|
||||
write_guard_a.owns = false;
|
||||
write_guard_a.guard_finish_callback = nullptr;
|
||||
}
|
||||
|
||||
nano::write_guard & nano::write_guard::operator= (nano::write_guard && write_guard_a) noexcept
|
||||
{
|
||||
owns = write_guard_a.owns;
|
||||
guard_finish_callback = std::move (write_guard_a.guard_finish_callback);
|
||||
|
||||
write_guard_a.owns = false;
|
||||
write_guard_a.guard_finish_callback = nullptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
nano::write_guard::~write_guard ()
|
||||
{
|
||||
guard_finish_callback ();
|
||||
cv.notify_all ();
|
||||
if (owns)
|
||||
{
|
||||
guard_finish_callback ();
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::write_guard::is_owned () const
|
||||
{
|
||||
return owns;
|
||||
}
|
||||
|
||||
void nano::write_guard::release ()
|
||||
{
|
||||
debug_assert (owns);
|
||||
if (owns)
|
||||
{
|
||||
guard_finish_callback ();
|
||||
}
|
||||
owns = false;
|
||||
}
|
||||
|
||||
nano::write_database_queue::write_database_queue () :
|
||||
guard_finish_callback ([& queue = queue, &mutex = mutex]() {
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
queue.pop_front ();
|
||||
guard_finish_callback ([& queue = queue, &mutex = mutex, &cv = cv]() {
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
queue.pop_front ();
|
||||
}
|
||||
cv.notify_all ();
|
||||
})
|
||||
{
|
||||
}
|
||||
|
@ -38,7 +75,7 @@ nano::write_guard nano::write_database_queue::wait (nano::writer writer)
|
|||
cv.wait (lk);
|
||||
}
|
||||
|
||||
return write_guard (cv, guard_finish_callback);
|
||||
return write_guard (guard_finish_callback);
|
||||
}
|
||||
|
||||
bool nano::write_database_queue::contains (nano::writer writer)
|
||||
|
@ -72,7 +109,7 @@ bool nano::write_database_queue::process (nano::writer writer)
|
|||
|
||||
nano::write_guard nano::write_database_queue::pop ()
|
||||
{
|
||||
return write_guard (cv, guard_finish_callback);
|
||||
return write_guard (guard_finish_callback);
|
||||
}
|
||||
|
||||
void nano::write_database_queue::stop ()
|
||||
|
|
|
@ -20,12 +20,18 @@ enum class writer
|
|||
class write_guard final
|
||||
{
|
||||
public:
|
||||
write_guard (nano::condition_variable & cv_a, std::function<void()> guard_finish_callback_a);
|
||||
write_guard (std::function<void()> guard_finish_callback_a);
|
||||
void release ();
|
||||
~write_guard ();
|
||||
write_guard (write_guard const &) = delete;
|
||||
write_guard & operator= (write_guard const &) = delete;
|
||||
write_guard (write_guard &&) noexcept;
|
||||
write_guard & operator= (write_guard &&) noexcept;
|
||||
bool is_owned () const;
|
||||
|
||||
private:
|
||||
nano::condition_variable & cv;
|
||||
std::function<void()> guard_finish_callback;
|
||||
bool owns{ true };
|
||||
};
|
||||
|
||||
class write_database_queue final
|
||||
|
|
|
@ -230,11 +230,10 @@ public:
|
|||
nano::block_hash frontier;
|
||||
};
|
||||
|
||||
/** The maximum amount of blocks to iterate over while writing */
|
||||
namespace confirmation_height
|
||||
{
|
||||
uint64_t const batch_write_size{ 4096 };
|
||||
uint64_t const unbounded_cutoff{ 4096 };
|
||||
/** When the uncemented count (block count - cemented count) is less than this use the unbounded processor */
|
||||
uint64_t const unbounded_cutoff{ 16384 };
|
||||
}
|
||||
|
||||
using vote_blocks_vec_iter = std::vector<boost::variant<std::shared_ptr<nano::block>, nano::block_hash>>::const_iterator;
|
||||
|
|
|
@ -456,6 +456,8 @@ TEST (node, mass_vote_by_hash)
|
|||
}
|
||||
}
|
||||
|
||||
namespace nano
|
||||
{
|
||||
TEST (confirmation_height, many_accounts_single_confirmation)
|
||||
{
|
||||
nano::system system;
|
||||
|
@ -465,8 +467,9 @@ TEST (confirmation_height, many_accounts_single_confirmation)
|
|||
auto node = system.add_node (node_config);
|
||||
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
|
||||
|
||||
// The number of frontiers should be more than the batch_write_size to test the amount of blocks confirmed is correct.
|
||||
auto num_accounts = nano::confirmation_height::batch_write_size * 2 + 50;
|
||||
// The number of frontiers should be more than the nano::confirmation_height::unbounded_cutoff to test the amount of blocks confirmed is correct.
|
||||
node->confirmation_height_processor.batch_write_size = 500;
|
||||
auto const num_accounts = nano::confirmation_height::unbounded_cutoff * 2 + 50;
|
||||
nano::keypair last_keypair = nano::test_genesis_key;
|
||||
auto last_open_hash = node->latest (nano::test_genesis_key.pub);
|
||||
{
|
||||
|
@ -492,7 +495,7 @@ TEST (confirmation_height, many_accounts_single_confirmation)
|
|||
node->block_confirm (block);
|
||||
}
|
||||
|
||||
system.deadline_set (60s);
|
||||
system.deadline_set (120s);
|
||||
auto transaction = node->store.tx_begin_read ();
|
||||
while (!node->ledger.block_confirmed (transaction, last_open_hash))
|
||||
{
|
||||
|
@ -523,7 +526,7 @@ TEST (confirmation_height, many_accounts_single_confirmation)
|
|||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_bounded, nano::stat::dir::in), num_accounts * 2 - 2);
|
||||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in), 0);
|
||||
|
||||
system.deadline_set (20s);
|
||||
system.deadline_set (40s);
|
||||
while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out))
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
|
@ -531,7 +534,6 @@ TEST (confirmation_height, many_accounts_single_confirmation)
|
|||
ASSERT_EQ (node->active.election_winner_details_size (), 0);
|
||||
}
|
||||
|
||||
// Can take up to 10 minutes
|
||||
TEST (confirmation_height, many_accounts_many_confirmations)
|
||||
{
|
||||
nano::system system;
|
||||
|
@ -541,7 +543,8 @@ TEST (confirmation_height, many_accounts_many_confirmations)
|
|||
auto node = system.add_node (node_config);
|
||||
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
|
||||
|
||||
auto num_accounts = nano::confirmation_height::batch_write_size * 2 + 50;
|
||||
node->confirmation_height_processor.batch_write_size = 500;
|
||||
auto const num_accounts = nano::confirmation_height::unbounded_cutoff * 2 + 50;
|
||||
auto latest_genesis = node->latest (nano::test_genesis_key.pub);
|
||||
std::vector<std::shared_ptr<nano::open_block>> open_blocks;
|
||||
{
|
||||
|
@ -566,15 +569,22 @@ TEST (confirmation_height, many_accounts_many_confirmations)
|
|||
node->block_confirm (open_block);
|
||||
}
|
||||
|
||||
system.deadline_set (600s);
|
||||
while (node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in) != (num_accounts - 1) * 2)
|
||||
system.deadline_set (1500s);
|
||||
auto const num_blocks_to_confirm = (num_accounts - 1) * 2;
|
||||
while (node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in) != num_blocks_to_confirm)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
auto num_confirmed_bounded = node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_bounded, nano::stat::dir::in);
|
||||
ASSERT_GE (num_confirmed_bounded, nano::confirmation_height::batch_write_size);
|
||||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in), (num_accounts - 1) * 2 - num_confirmed_bounded);
|
||||
ASSERT_GE (num_confirmed_bounded, nano::confirmation_height::unbounded_cutoff);
|
||||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in), num_blocks_to_confirm - num_confirmed_bounded);
|
||||
|
||||
system.deadline_set (60s);
|
||||
while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out))
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
auto transaction = node->store.tx_begin_read ();
|
||||
auto cemented_count = 0;
|
||||
|
@ -583,14 +593,8 @@ TEST (confirmation_height, many_accounts_many_confirmations)
|
|||
cemented_count += i->second.height;
|
||||
}
|
||||
|
||||
ASSERT_EQ (num_blocks_to_confirm + 1, cemented_count);
|
||||
ASSERT_EQ (cemented_count, node->ledger.cache.cemented_count);
|
||||
|
||||
system.deadline_set (20s);
|
||||
while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out))
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
ASSERT_EQ (node->active.election_winner_details_size (), 0);
|
||||
}
|
||||
|
||||
|
@ -605,7 +609,8 @@ TEST (confirmation_height, long_chains)
|
|||
nano::block_hash latest (node->latest (nano::test_genesis_key.pub));
|
||||
system.wallet (0)->insert_adhoc (key1.prv);
|
||||
|
||||
constexpr auto num_blocks = nano::confirmation_height::batch_write_size * 2 + 50;
|
||||
node->confirmation_height_processor.batch_write_size = 500;
|
||||
auto const num_blocks = nano::confirmation_height::unbounded_cutoff * 2 + 50;
|
||||
|
||||
// First open the other account
|
||||
nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio + num_blocks + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest));
|
||||
|
@ -651,7 +656,7 @@ TEST (confirmation_height, long_chains)
|
|||
// Call block confirm on the existing receive block on the genesis account which will confirm everything underneath on both accounts
|
||||
node->block_confirm (receive1);
|
||||
|
||||
system.deadline_set (10s);
|
||||
system.deadline_set (30s);
|
||||
while (true)
|
||||
{
|
||||
auto transaction = node->store.tx_begin_read ();
|
||||
|
@ -687,13 +692,14 @@ TEST (confirmation_height, long_chains)
|
|||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_bounded, nano::stat::dir::in), num_blocks * 2 + 2);
|
||||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in), 0);
|
||||
|
||||
system.deadline_set (20s);
|
||||
system.deadline_set (40s);
|
||||
while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out))
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (node->active.election_winner_details_size (), 0);
|
||||
}
|
||||
}
|
||||
|
||||
TEST (confirmation_height, dynamic_algorithm)
|
||||
{
|
||||
|
@ -835,7 +841,7 @@ TEST (confirmation_height, dynamic_algorithm_no_transition_while_pending)
|
|||
}
|
||||
}
|
||||
|
||||
// Can take up to 1 hour
|
||||
// Can take up to 1 hour (recommend modifying test work difficulty base level to speed this up)
|
||||
TEST (confirmation_height, prioritize_frontiers_overwrite)
|
||||
{
|
||||
nano::system system;
|
||||
|
@ -844,7 +850,7 @@ TEST (confirmation_height, prioritize_frontiers_overwrite)
|
|||
auto node = system.add_node (node_config);
|
||||
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
|
||||
|
||||
auto num_accounts = node->active.max_priority_cementable_frontiers * 2 + 50;
|
||||
auto num_accounts = node->active.max_priority_cementable_frontiers * 2;
|
||||
nano::keypair last_keypair = nano::test_genesis_key;
|
||||
auto last_open_hash = node->latest (nano::test_genesis_key.pub);
|
||||
// Clear confirmation height so that the genesis account has the same amount of uncemented blocks as the other frontiers
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue