Store whole block in confirmation height processor queue (#2758)

* Store whole block in confirmation height processor queue

* Fix merge

* Fix merge error
This commit is contained in:
Wesley Shillingford 2021-02-18 16:56:47 +00:00 committed by GitHub
commit 7a6a7f4cfe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 159 additions and 92 deletions

View file

@ -732,17 +732,16 @@ TEST (confirmation_heightDeathTest, rollback_added_block)
store->initialize (transaction, genesis, ledger.cache);
}
auto block_hash_being_processed (send->hash ());
uint64_t batch_write_size = 2048;
std::atomic<bool> stopped{ false };
nano::confirmation_height_unbounded unbounded_processor (
ledger, write_database_queue, 10ms, logging, logger, stopped, block_hash_being_processed, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
ledger, write_database_queue, 10ms, logging, logger, stopped, send, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
// Processing a block which doesn't exist should bail
ASSERT_DEATH_IF_SUPPORTED (unbounded_processor.process (), "");
nano::confirmation_height_bounded bounded_processor (
ledger, write_database_queue, 10ms, logging, logger, stopped, block_hash_being_processed, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
ledger, write_database_queue, 10ms, logging, logger, stopped, send, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
// Processing a block which doesn't exist should bail
ASSERT_DEATH_IF_SUPPORTED (bounded_processor.process (), "");
}
@ -811,11 +810,10 @@ TEST (confirmation_heightDeathTest, modified_chain)
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send).code);
}
auto block_hash_being_processed (send->hash ());
uint64_t batch_write_size = 2048;
std::atomic<bool> stopped{ false };
nano::confirmation_height_bounded bounded_processor (
ledger, write_database_queue, 10ms, logging, logger, stopped, block_hash_being_processed, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
ledger, write_database_queue, 10ms, logging, logger, stopped, send, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
{
// This reads the blocks in the account, but prevents any writes from occuring yet
@ -834,7 +832,7 @@ TEST (confirmation_heightDeathTest, modified_chain)
store->confirmation_height_put (store->tx_begin_write (), nano::genesis_account, { 1, nano::genesis_hash });
nano::confirmation_height_unbounded unbounded_processor (
ledger, write_database_queue, 10ms, logging, logger, stopped, block_hash_being_processed, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
ledger, write_database_queue, 10ms, logging, logger, stopped, send, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
{
// This reads the blocks in the account, but prevents any writes from occuring yet
@ -885,11 +883,10 @@ TEST (confirmation_heightDeathTest, modified_chain_account_removed)
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *open).code);
}
auto block_hash_being_processed (open->hash ());
uint64_t batch_write_size = 2048;
std::atomic<bool> stopped{ false };
nano::confirmation_height_unbounded unbounded_processor (
ledger, write_database_queue, 10ms, logging, logger, stopped, block_hash_being_processed, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
ledger, write_database_queue, 10ms, logging, logger, stopped, open, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
{
// This reads the blocks in the account, but prevents any writes from occuring yet
@ -909,7 +906,7 @@ TEST (confirmation_heightDeathTest, modified_chain_account_removed)
store->confirmation_height_put (store->tx_begin_write (), nano::genesis_account, { 1, nano::genesis_hash });
nano::confirmation_height_bounded bounded_processor (
ledger, write_database_queue, 10ms, logging, logger, stopped, block_hash_being_processed, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
ledger, write_database_queue, 10ms, logging, logger, stopped, open, batch_write_size, [](auto const &) {}, [](auto const &) {}, []() { return 0; });
{
// This reads the blocks in the account, but prevents any writes from occuring yet
@ -951,7 +948,7 @@ TEST (confirmation_height, pending_observer_callbacks)
add_callback_stats (*node);
node->confirmation_height_processor.add (send1->hash ());
node->confirmation_height_processor.add (send1);
// Confirm the callback is not called under this circumstance because there is no election information
ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) == 1 && node->ledger.stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::all, nano::stat::dir::out) == 1);
@ -1321,7 +1318,7 @@ TEST (confirmation_height, election_winner_details_clearing)
// Add an already cemented block with fake election details. It should get removed
node->active.add_election_winner_details (send2->hash (), nullptr);
node->confirmation_height_processor.add (send2->hash ());
node->confirmation_height_processor.add (send2);
ASSERT_TIMELY (10s, node->active.election_winner_details_size () == 0);
@ -1388,7 +1385,7 @@ TEST (confirmation_height, unbounded_block_cache_iteration)
// Prevent conf height processor doing any writes, so that we can query is_processing_block correctly
auto write_guard = write_database_queue.wait (nano::writer::testing);
// Add the frontier block
confirmation_height_processor.add (send1->hash ());
confirmation_height_processor.add (send1);
// The most uncemented block (previous block) should be seen as processing by the unbounded processor
while (!confirmation_height_processor.is_processing_block (send->hash ()))
@ -1436,12 +1433,11 @@ TEST (confirmation_height, pruned_source)
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send3).code);
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *open2).code);
}
auto block_hash_being_processed (open2->hash ());
uint64_t batch_write_size = 2;
std::atomic<bool> stopped{ false };
bool first_time{ true };
nano::confirmation_height_bounded bounded_processor (
ledger, write_database_queue, 10ms, logging, logger, stopped, block_hash_being_processed, batch_write_size, [&](auto const & cemented_blocks_a) {
ledger, write_database_queue, 10ms, logging, logger, stopped, open2, batch_write_size, [&](auto const & cemented_blocks_a) {
if (first_time)
{
// Prune the send

View file

@ -343,4 +343,26 @@ struct hash<::nano::qualified_root>
return hash<::nano::uint512_union> () (data_a);
}
};
template <>
struct equal_to<std::reference_wrapper<::nano::block_hash const>>
{
bool operator() (std::reference_wrapper<::nano::block_hash const> const & lhs, std::reference_wrapper<::nano::block_hash const> const & rhs) const
{
return lhs.get () == rhs.get ();
}
};
}
namespace boost
{
template <>
struct hash<std::reference_wrapper<::nano::block_hash const>>
{
size_t operator() (std::reference_wrapper<::nano::block_hash const> const & hash_a) const
{
std::hash<::nano::block_hash> hash;
return hash (hash_a);
}
};
}

View file

@ -1208,7 +1208,7 @@ int main (int argc, char * const * argv)
// Confirm blocks for node1
for (auto & block : blocks)
{
node1->confirmation_height_processor.add (block->hash ());
node1->confirmation_height_processor.add (block);
}
while (node1->ledger.cache.cemented_count != node1->ledger.cache.block_count)
{

View file

@ -10,14 +10,14 @@
#include <numeric>
nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logging const & logging_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, uint64_t & batch_write_size_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logging const & logging_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, std::shared_ptr<nano::block> const & original_block_a, uint64_t & batch_write_size_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
ledger (ledger_a),
write_database_queue (write_database_queue_a),
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
logging (logging_a),
logger (logger_a),
stopped (stopped_a),
original_hash (original_hash_a),
original_block (original_block_a),
batch_write_size (batch_write_size_a),
notify_observers_callback (notify_observers_callback_a),
notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a),
@ -50,7 +50,7 @@ nano::confirmation_height_bounded::top_and_next_hash nano::confirmation_height_b
}
else
{
next = { original_hash, boost::none, 0 };
next = { original_block->hash (), boost::none, 0 };
}
return next;
@ -77,7 +77,17 @@ void nano::confirmation_height_bounded::process ()
current = hash_to_process.top;
auto top_level_hash = current;
auto block = ledger.store.block_get (transaction, current);
std::shared_ptr<nano::block> block;
if (first_iter)
{
debug_assert (current == original_block->hash ());
block = original_block;
}
else
{
block = ledger.store.block_get (transaction, current);
}
if (!block)
{
if (ledger.pruning && ledger.store.pruned_exists (transaction, current))
@ -114,9 +124,9 @@ void nano::confirmation_height_bounded::process ()
{
ledger.store.confirmation_height_get (transaction, account, confirmation_height_info);
// This block was added to the confirmation height processor but is already confirmed
if (first_iter && confirmation_height_info.height >= block->sideband ().height && current == original_hash)
if (first_iter && confirmation_height_info.height >= block->sideband ().height && current == original_block->hash ())
{
notify_block_already_cemented_observers_callback (original_hash);
notify_block_already_cemented_observers_callback (original_block->hash ());
}
}
@ -185,7 +195,7 @@ void nano::confirmation_height_bounded::process ()
// When there are a lot of pending confirmation height blocks, it is more efficient to
// bulk some of them up to enable better write performance which becomes the bottleneck.
auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time);
auto finished_iterating = current == original_hash;
auto finished_iterating = current == original_block->hash ();
auto non_awaiting_processing = awaiting_processing_size_callback () == 0;
auto should_output = finished_iterating && (non_awaiting_processing || min_time_exceeded);
auto force_write = pending_writes.size () >= pending_writes_max_size || accounts_confirmed_info.size () >= pending_writes_max_size;
@ -208,7 +218,7 @@ void nano::confirmation_height_bounded::process ()
first_iter = false;
transaction.refresh ();
} while ((!receive_source_pairs.empty () || current != original_hash) && !stopped);
} while ((!receive_source_pairs.empty () || current != original_block->hash ()) && !stopped);
debug_assert (checkpoints.empty ());
}

View file

@ -19,7 +19,7 @@ class write_guard;
class confirmation_height_bounded final
{
public:
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logging const &, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, uint64_t &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logging const &, nano::logger_mt &, std::atomic<bool> &, std::shared_ptr<nano::block> const &, uint64_t &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
bool pending_empty () const;
void clear_process_vars ();
void process ();
@ -124,7 +124,7 @@ private:
nano::logging const & logging;
nano::logger_mt & logger;
std::atomic<bool> & stopped;
nano::block_hash const & original_hash;
std::shared_ptr<nano::block> const & original_block;
uint64_t & batch_write_size;
std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> notify_observers_callback;
std::function<void(nano::block_hash const &)> notify_block_already_cemented_observers_callback;

View file

@ -15,8 +15,8 @@ nano::confirmation_height_processor::confirmation_height_processor (nano::ledger
ledger (ledger_a),
write_database_queue (write_database_queue_a),
// clang-format off
unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logging_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logging_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logging_a, logger_a, stopped, original_block, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logging_a, logger_a, stopped, original_block, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
// clang-format on
thread ([this, &latch, mode_a]() {
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
@ -86,7 +86,7 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
{
auto lock_and_cleanup = [&lk, this]() {
lk.lock ();
original_hash.clear ();
original_block = nullptr;
original_hashes_pending.clear ();
bounded_processor.clear_process_vars ();
unbounded_processor.clear_process_vars ();
@ -129,7 +129,7 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
{
// Pausing is only utilised in some tests to help prevent it processing added blocks until required.
debug_assert (network_params.network.is_dev_network ());
original_hash.clear ();
original_block = nullptr;
condition.wait (lk);
}
}
@ -152,11 +152,11 @@ void nano::confirmation_height_processor::unpause ()
condition.notify_one ();
}
void nano::confirmation_height_processor::add (nano::block_hash const & hash_a)
void nano::confirmation_height_processor::add (std::shared_ptr<nano::block> const & block_a)
{
{
nano::lock_guard<std::mutex> lk (mutex);
awaiting_processing.get<tag_sequence> ().emplace_back (hash_a);
awaiting_processing.get<tag_sequence> ().emplace_back (block_a);
}
condition.notify_one ();
}
@ -165,8 +165,8 @@ void nano::confirmation_height_processor::set_next_hash ()
{
nano::lock_guard<std::mutex> guard (mutex);
debug_assert (!awaiting_processing.empty ());
original_hash = awaiting_processing.get<tag_sequence> ().front ();
original_hashes_pending.insert (original_hash);
original_block = awaiting_processing.get<tag_sequence> ().front ().block;
original_hashes_pending.insert (original_block->hash ());
awaiting_processing.get<tag_sequence> ().pop_front ();
}
@ -235,5 +235,5 @@ bool nano::confirmation_height_processor::is_processing_block (nano::block_hash
nano::block_hash nano::confirmation_height_processor::current () const
{
nano::lock_guard<std::mutex> lk (mutex);
return original_hash;
return original_block ? original_block->hash () : 0;
}

View file

@ -8,6 +8,7 @@
#include <nano/secure/common.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
@ -36,7 +37,7 @@ public:
void pause ();
void unpause ();
void stop ();
void add (nano::block_hash const & hash_a);
void add (std::shared_ptr<nano::block> const &);
void run (confirmation_height_mode);
size_t awaiting_processing_size () const;
bool is_processing_added_block (nano::block_hash const & hash_a) const;
@ -50,13 +51,27 @@ private:
mutable std::mutex mutex;
// Hashes which have been added to the confirmation height processor, but not yet processed
// clang-format off
struct block_wrapper
{
block_wrapper (std::shared_ptr<nano::block> const & block_a) :
block (block_a)
{
}
std::reference_wrapper<nano::block_hash const> hash () const
{
return block->hash ();
}
std::shared_ptr<nano::block> block;
};
class tag_sequence {};
class tag_hash {};
boost::multi_index_container<nano::block_hash,
boost::multi_index_container<block_wrapper,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequence>>,
mi::hashed_unique<mi::tag<tag_hash>,
mi::identity<nano::block_hash>>>> awaiting_processing;
mi::const_mem_fun<block_wrapper, std::reference_wrapper<nano::block_hash const>, &block_wrapper::hash>>>> awaiting_processing;
// clang-format on
// Hashes which have been added and processed, but have not been cemented
@ -64,7 +79,7 @@ private:
bool paused{ false };
/** This is the last block popped off the confirmation height pending collection */
nano::block_hash original_hash{ 0 };
std::shared_ptr<nano::block> original_block;
nano::condition_variable condition;
std::atomic<bool> stopped{ false };

View file

@ -8,14 +8,14 @@
#include <numeric>
nano::confirmation_height_unbounded::confirmation_height_unbounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logging const & logging_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, uint64_t & batch_write_size_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
nano::confirmation_height_unbounded::confirmation_height_unbounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logging const & logging_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, std::shared_ptr<nano::block> const & original_block_a, uint64_t & batch_write_size_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
ledger (ledger_a),
write_database_queue (write_database_queue_a),
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
logging (logging_a),
logger (logger_a),
stopped (stopped_a),
original_hash (original_hash_a),
original_block (original_block_a),
batch_write_size (batch_write_size_a),
notify_observers_callback (notify_observers_callback_a),
notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a),
@ -31,7 +31,7 @@ void nano::confirmation_height_unbounded::process ()
timer.restart ();
}
std::shared_ptr<conf_height_details> receive_details;
auto current = original_hash;
auto current = original_block->hash ();
std::vector<nano::block_hash> orig_block_callback_data;
std::vector<receive_source_pair> receive_source_pairs;
@ -54,12 +54,23 @@ void nano::confirmation_height_unbounded::process ()
// (if the original block is not already a receive)
if (receive_details)
{
current = original_hash;
current = original_block->hash ();
receive_details = nullptr;
}
}
auto block (get_block_and_sideband (current, read_transaction));
std::shared_ptr<nano::block> block;
if (first_iter)
{
debug_assert (current == original_block->hash ());
// This is the original block passed so can use it directly
block = original_block;
block_cache[original_block->hash ()] = original_block;
}
else
{
block = get_block_and_sideband (current, read_transaction);
}
if (!block)
{
auto error_str = (boost::format ("Ledger mismatch trying to set confirmation height for block %1% (unbounded processor)") % current.to_string ()).str ();
@ -88,9 +99,10 @@ void nano::confirmation_height_unbounded::process ()
confirmation_height = confirmation_height_info.height;
// This block was added to the confirmation height processor but is already confirmed
if (first_iter && confirmation_height >= block_height && current == original_hash)
if (first_iter && confirmation_height >= block_height)
{
notify_block_already_cemented_observers_callback (original_hash);
debug_assert (current == original_block->hash ());
notify_block_already_cemented_observers_callback (original_block->hash ());
}
}
auto iterated_height = confirmation_height;
@ -104,7 +116,7 @@ void nano::confirmation_height_unbounded::process ()
auto already_traversed = iterated_height >= block_height;
if (!already_traversed)
{
collect_unconfirmed_receive_and_sources_for_account (block_height, iterated_height, current, account, read_transaction, receive_source_pairs, block_callback_datas_required, orig_block_callback_data);
collect_unconfirmed_receive_and_sources_for_account (block_height, iterated_height, block, current, account, read_transaction, receive_source_pairs, block_callback_datas_required, orig_block_callback_data);
}
// Exit early when the processor has been stopped, otherwise this function may take a
@ -174,20 +186,33 @@ void nano::confirmation_height_unbounded::process ()
first_iter = false;
read_transaction.renew ();
} while ((!receive_source_pairs.empty () || current != original_hash) && !stopped);
} while ((!receive_source_pairs.empty () || current != original_block->hash ()) && !stopped);
}
void nano::confirmation_height_unbounded::collect_unconfirmed_receive_and_sources_for_account (uint64_t block_height_a, uint64_t confirmation_height_a, nano::block_hash const & hash_a, nano::account const & account_a, nano::read_transaction const & transaction_a, std::vector<receive_source_pair> & receive_source_pairs_a, std::vector<nano::block_hash> & block_callback_data_a, std::vector<nano::block_hash> & orig_block_callback_data_a)
void nano::confirmation_height_unbounded::collect_unconfirmed_receive_and_sources_for_account (uint64_t block_height_a, uint64_t confirmation_height_a, std::shared_ptr<nano::block> const & block_a, nano::block_hash const & hash_a, nano::account const & account_a, nano::read_transaction const & transaction_a, std::vector<receive_source_pair> & receive_source_pairs_a, std::vector<nano::block_hash> & block_callback_data_a, std::vector<nano::block_hash> & orig_block_callback_data_a)
{
debug_assert (block_a->hash () == hash_a);
auto hash (hash_a);
auto num_to_confirm = block_height_a - confirmation_height_a;
// Handle any sends above a receive
auto is_original_block = (hash == original_hash);
bool hit_receive = false;
auto is_original_block = (hash == original_block->hash ());
auto hit_receive = false;
auto first_iter = true;
while ((num_to_confirm > 0) && !hash.is_zero () && !stopped)
{
auto block (get_block_and_sideband (hash, transaction_a));
std::shared_ptr<nano::block> block;
if (first_iter)
{
debug_assert (hash == hash_a);
block = block_a;
block_cache[hash] = block_a;
}
else
{
block = get_block_and_sideband (hash, transaction_a);
}
if (block)
{
auto source (block->source ());
@ -240,6 +265,7 @@ void nano::confirmation_height_unbounded::collect_unconfirmed_receive_and_source
}
--num_to_confirm;
first_iter = false;
}
}

View file

@ -20,7 +20,7 @@ class write_guard;
class confirmation_height_unbounded final
{
public:
confirmation_height_unbounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logging const &, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, uint64_t &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
confirmation_height_unbounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logging const &, nano::logger_mt &, std::atomic<bool> &, std::shared_ptr<nano::block> const & original_block_a, uint64_t &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
bool pending_empty () const;
void clear_process_vars ();
void process ();
@ -92,7 +92,7 @@ private:
std::vector<nano::block_hash> const & orig_block_callback_data;
};
void collect_unconfirmed_receive_and_sources_for_account (uint64_t, uint64_t, nano::block_hash const &, nano::account const &, nano::read_transaction const &, std::vector<receive_source_pair> &, std::vector<nano::block_hash> &, std::vector<nano::block_hash> &);
void collect_unconfirmed_receive_and_sources_for_account (uint64_t, uint64_t, std::shared_ptr<nano::block> const &, nano::block_hash const &, nano::account const &, nano::read_transaction const &, std::vector<receive_source_pair> &, std::vector<nano::block_hash> &, std::vector<nano::block_hash> &);
void prepare_iterated_blocks_for_cementing (preparation_data &);
nano::network_params network_params;
@ -101,7 +101,7 @@ private:
std::chrono::milliseconds batch_separate_pending_min_time;
nano::logger_mt & logger;
std::atomic<bool> & stopped;
nano::block_hash const & original_hash;
std::shared_ptr<nano::block> const & original_block;
uint64_t & batch_write_size;
nano::logging const & logging;

View file

@ -1356,12 +1356,11 @@ void nano::node::process_confirmed_data (nano::transaction const & transaction_a
void nano::node::process_confirmed (nano::election_status const & status_a, uint64_t iteration_a)
{
auto block_a (status_a.winner);
auto hash (block_a->hash ());
const size_t num_iters = (config.block_processor_batch_max_time / network_params.node.process_confirmed_interval) * 4;
if (ledger.block_exists (hash))
auto hash (status_a.winner->hash ());
const auto num_iters = (config.block_processor_batch_max_time / network_params.node.process_confirmed_interval) * 4;
if (auto block_l = ledger.store.block_get (ledger.store.tx_begin_read (), hash))
{
confirmation_height_processor.add (hash);
confirmation_height_processor.add (block_l);
}
else if (iteration_a < num_iters)
{

View file

@ -582,7 +582,7 @@ TEST (confirmation_height, many_accounts_many_confirmations)
node->confirmation_height_processor.batch_write_size = 500;
auto const num_accounts = nano::confirmation_height::unbounded_cutoff * 2 + 50;
auto ladev_genesis = node->latest (nano::dev_genesis_key.pub);
auto latest_genesis = node->latest (nano::dev_genesis_key.pub);
std::vector<std::shared_ptr<nano::open_block>> open_blocks;
{
auto transaction = node->store.tx_begin_write ();
@ -591,12 +591,12 @@ TEST (confirmation_height, many_accounts_many_confirmations)
nano::keypair key;
system.wallet (0)->insert_adhoc (key.prv);
nano::send_block send (ladev_genesis, key.pub, node->online_reps.delta (), nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (ladev_genesis));
nano::send_block send (latest_genesis, key.pub, node->online_reps.delta (), nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (latest_genesis));
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code);
auto open = std::make_shared<nano::open_block> (send.hash (), nano::dev_genesis_key.pub, key.pub, key.prv, key.pub, *system.work.generate (key.pub));
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *open).code);
open_blocks.push_back (std::move (open));
ladev_genesis = send.hash ();
latest_genesis = send.hash ();
}
}
@ -732,16 +732,15 @@ TEST (confirmation_height, dynamic_algorithm)
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node = system.add_node (node_config);
nano::genesis genesis;
nano::keypair key;
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
auto const num_blocks = nano::confirmation_height::unbounded_cutoff;
auto ladev_genesis = node->latest (nano::dev_genesis_key.pub);
auto latest_genesis = nano::genesis ().open;
std::vector<std::shared_ptr<nano::state_block>> state_blocks;
for (auto i = 0; i < num_blocks; ++i)
{
auto send (std::make_shared<nano::state_block> (nano::dev_genesis_key.pub, ladev_genesis, nano::dev_genesis_key.pub, nano::genesis_amount - i - 1, key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (ladev_genesis)));
ladev_genesis = send->hash ();
auto send (std::make_shared<nano::state_block> (nano::dev_genesis_key.pub, latest_genesis->hash (), nano::dev_genesis_key.pub, nano::genesis_amount - i - 1, key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (latest_genesis->hash ())));
latest_genesis = send;
state_blocks.push_back (send);
}
{
@ -752,10 +751,10 @@ TEST (confirmation_height, dynamic_algorithm)
}
}
node->confirmation_height_processor.add (state_blocks.front ()->hash ());
node->confirmation_height_processor.add (state_blocks.front ());
ASSERT_TIMELY (20s, node->ledger.cache.cemented_count == 2);
node->confirmation_height_processor.add (ladev_genesis);
node->confirmation_height_processor.add (latest_genesis);
ASSERT_TIMELY (20s, node->ledger.cache.cemented_count == num_blocks + 1);
@ -790,14 +789,14 @@ TEST (confirmation_height, dynamic_algorithm_no_transition_while_pending)
nano::keypair key;
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
auto ladev_genesis = node->latest (nano::dev_genesis_key.pub);
auto latest_genesis = node->latest (nano::dev_genesis_key.pub);
std::vector<std::shared_ptr<nano::state_block>> state_blocks;
auto const num_blocks = nano::confirmation_height::unbounded_cutoff - 2;
auto add_block_to_genesis_chain = [&](nano::write_transaction & transaction) {
static int num = 0;
auto send (std::make_shared<nano::state_block> (nano::dev_genesis_key.pub, ladev_genesis, nano::dev_genesis_key.pub, nano::genesis_amount - num - 1, key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (ladev_genesis)));
ladev_genesis = send->hash ();
auto send (std::make_shared<nano::state_block> (nano::dev_genesis_key.pub, latest_genesis, nano::dev_genesis_key.pub, nano::genesis_amount - num - 1, key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (latest_genesis)));
latest_genesis = send->hash ();
state_blocks.push_back (send);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send).code);
++num;
@ -812,7 +811,7 @@ TEST (confirmation_height, dynamic_algorithm_no_transition_while_pending)
{
auto write_guard = node->write_database_queue.wait (nano::writer::testing);
// To limit any data races we are not calling node.block_confirm
node->confirmation_height_processor.add (state_blocks.back ()->hash ());
node->confirmation_height_processor.add (state_blocks.back ());
nano::timer<> timer;
timer.start ();
@ -834,7 +833,7 @@ TEST (confirmation_height, dynamic_algorithm_no_transition_while_pending)
add_block_to_genesis_chain (transaction);
}
// Make sure this is at a height lower than the block in the add () call above
node->confirmation_height_processor.add (state_blocks.front ()->hash ());
node->confirmation_height_processor.add (state_blocks.front ());
node->confirmation_height_processor.unpause ();
}
@ -865,7 +864,7 @@ TEST (confirmation_height, many_accounts_send_receive_self)
auto const num_accounts = 100000;
#endif
auto ladev_genesis = node->latest (nano::dev_genesis_key.pub);
auto latest_genesis = node->latest (nano::dev_genesis_key.pub);
std::vector<nano::keypair> keys;
std::vector<std::shared_ptr<nano::open_block>> open_blocks;
{
@ -875,12 +874,12 @@ TEST (confirmation_height, many_accounts_send_receive_self)
nano::keypair key;
keys.emplace_back (key);
nano::send_block send (ladev_genesis, key.pub, nano::genesis_amount - 1 - i, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (ladev_genesis));
nano::send_block send (latest_genesis, key.pub, nano::genesis_amount - 1 - i, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (latest_genesis));
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code);
auto open = std::make_shared<nano::open_block> (send.hash (), nano::dev_genesis_key.pub, key.pub, key.prv, key.pub, *system.work.generate (key.pub));
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *open).code);
open_blocks.push_back (std::move (open));
ladev_genesis = send.hash ();
latest_genesis = send.hash ();
}
}
@ -981,7 +980,7 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
auto const num_accounts = 100000;
auto ladev_genesis = nano::genesis_hash;
auto latest_genesis = nano::genesis_hash;
std::vector<nano::keypair> keys;
std::vector<std::shared_ptr<nano::open_block>> open_blocks;
@ -996,18 +995,18 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
{
nano::keypair key;
keys.emplace_back (key);
nano::send_block send (ladev_genesis, key.pub, nano::genesis_amount - 1 - i, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *pool.generate (ladev_genesis));
nano::send_block send (latest_genesis, key.pub, nano::genesis_amount - 1 - i, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *pool.generate (latest_genesis));
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, send).code);
auto open = std::make_shared<nano::open_block> (send.hash (), nano::dev_genesis_key.pub, key.pub, key.prv, key.pub, *pool.generate (key.pub));
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *open).code);
open_blocks.push_back (std::move (open));
ladev_genesis = send.hash ();
latest_genesis = send.hash ();
}
}
for (auto & open_block : open_blocks)
{
confirmation_height_processor.add (open_block->hash ());
confirmation_height_processor.add (open_block);
}
system.deadline_set (1000s);
@ -1045,8 +1044,8 @@ TEST (confirmation_height, many_accounts_send_receive_self_no_elections)
// Now send and receive to self
for (int i = 0; i < open_blocks.size (); ++i)
{
confirmation_height_processor.add (send_blocks[i]->hash ());
confirmation_height_processor.add (receive_blocks[i]->hash ());
confirmation_height_processor.add (send_blocks[i]);
confirmation_height_processor.add (receive_blocks[i]);
}
system.deadline_set (1000s);
@ -1121,8 +1120,8 @@ TEST (confirmation_height, prioritize_frontiers_overwrite)
// Add a new frontier with 1 block, it should not be added to the frontier container because it is not higher than any already in the maxed out container
nano::keypair key;
auto ladev_genesis = node->latest (nano::dev_genesis_key.pub);
nano::send_block send (ladev_genesis, key.pub, nano::Gxrb_ratio - 1, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (ladev_genesis));
auto latest_genesis = node->latest (nano::dev_genesis_key.pub);
nano::send_block send (latest_genesis, key.pub, nano::Gxrb_ratio - 1, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (latest_genesis));
nano::open_block open (send.hash (), nano::dev_genesis_key.pub, key.pub, key.prv, key.pub, *system.work.generate (key.pub));
{
auto transaction = node->store.tx_begin_write ();
@ -1312,11 +1311,11 @@ TEST (telemetry, under_load)
nano::keypair key1;
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
system.wallet (0)->insert_adhoc (key.prv);
auto ladev_genesis = node->latest (nano::dev_genesis_key.pub);
auto latest_genesis = node->latest (nano::dev_genesis_key.pub);
auto num_blocks = 150000;
auto send (std::make_shared<nano::state_block> (nano::dev_genesis_key.pub, ladev_genesis, nano::dev_genesis_key.pub, nano::genesis_amount - num_blocks, key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (ladev_genesis)));
auto send (std::make_shared<nano::state_block> (nano::dev_genesis_key.pub, latest_genesis, nano::dev_genesis_key.pub, nano::genesis_amount - num_blocks, key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (latest_genesis)));
node->process_active (send);
ladev_genesis = send->hash ();
latest_genesis = send->hash ();
auto open (std::make_shared<nano::state_block> (key.pub, 0, key.pub, num_blocks, send->hash (), key.prv, key.pub, *system.work.generate (key.pub)));
node->process_active (open);
auto latest_key = open->hash ();
@ -1331,7 +1330,7 @@ TEST (telemetry, under_load)
}
};
std::thread thread1 (thread_func, nano::dev_genesis_key, ladev_genesis, nano::genesis_amount - num_blocks);
std::thread thread1 (thread_func, nano::dev_genesis_key, latest_genesis, nano::genesis_amount - num_blocks);
std::thread thread2 (thread_func, key, latest_key, num_blocks);
ASSERT_TIMELY (200s, node1->ledger.cache.block_count == num_blocks * 2 + 3);
@ -1732,19 +1731,19 @@ TEST (node, mass_block_new)
nano::state_block_builder builder;
std::vector<std::shared_ptr<nano::state_block>> send_blocks;
auto send_threshold (nano::work_threshold (nano::work_version::work_1, nano::block_details (nano::epoch::epoch_2, true, false, false)));
auto ladev_genesis = node.latest (nano::dev_genesis_key.pub);
auto latest_genesis = node.latest (nano::dev_genesis_key.pub);
for (auto i = 0; i < num_blocks; ++i)
{
auto send = builder.make_block ()
.account (nano::dev_genesis_key.pub)
.previous (ladev_genesis)
.previous (latest_genesis)
.balance (nano::genesis_amount - i - 1)
.representative (nano::dev_genesis_key.pub)
.link (keys[i].pub)
.sign (nano::dev_genesis_key.prv, nano::dev_genesis_key.pub)
.work (*system.work.generate (nano::work_version::work_1, ladev_genesis, send_threshold))
.work (*system.work.generate (nano::work_version::work_1, latest_genesis, send_threshold))
.build ();
ladev_genesis = send->hash ();
latest_genesis = send->hash ();
send_blocks.push_back (std::move (send));
}
std::cout << "Send blocks built, start processing" << std::endl;