Use block cache from unbounded processor when checking if block is processing (#2756)
This commit is contained in:
parent
aa10e79825
commit
788aafdb69
6 changed files with 88 additions and 15 deletions
|
@ -1574,3 +1574,52 @@ TEST (confirmation_height, election_winner_details_clearing_node_process_confirm
|
|||
node->process_confirmed (election, 1000000);
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
}
|
||||
|
||||
TEST (confirmation_height, unbounded_block_cache_iteration)
|
||||
{
|
||||
nano::logger_mt logger;
|
||||
auto path (nano::unique_path ());
|
||||
nano::mdb_store store (logger, path);
|
||||
ASSERT_TRUE (!store.init_error ());
|
||||
nano::genesis genesis;
|
||||
nano::stat stats;
|
||||
nano::ledger ledger (store, stats);
|
||||
nano::write_database_queue write_database_queue;
|
||||
boost::latch initialized_latch{ 0 };
|
||||
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
|
||||
nano::keypair key1;
|
||||
auto send = std::make_shared<nano::send_block> (genesis.hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *pool.generate (genesis.hash ()));
|
||||
auto send1 = std::make_shared<nano::send_block> (send->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *pool.generate (send->hash ()));
|
||||
{
|
||||
auto transaction (store.tx_begin_write ());
|
||||
store.initialize (transaction, genesis, ledger.cache);
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send).code);
|
||||
ASSERT_EQ (nano::process_result::progress, ledger.process (transaction, *send1).code);
|
||||
}
|
||||
|
||||
nano::confirmation_height_processor confirmation_height_processor (ledger, write_database_queue, 10ms, logger, initialized_latch, nano::confirmation_height_mode::unbounded);
|
||||
nano::timer<> timer;
|
||||
timer.start ();
|
||||
{
|
||||
// Prevent conf height processor doing any writes, so that we can query is_processing_block correctly
|
||||
auto write_guard = write_database_queue.wait (nano::writer::testing);
|
||||
// Add the frontier block
|
||||
confirmation_height_processor.add (send1->hash ());
|
||||
|
||||
// The most uncemented block (previous block) should be seen as processing by the unbounded processor
|
||||
while (!confirmation_height_processor.is_processing_block (send->hash ()))
|
||||
{
|
||||
ASSERT_LT (timer.since_start (), 10s);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until the current block is finished processing
|
||||
while (!confirmation_height_processor.current ().is_zero ())
|
||||
{
|
||||
ASSERT_LT (timer.since_start (), 10s);
|
||||
}
|
||||
|
||||
ASSERT_EQ (2, stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
|
||||
ASSERT_EQ (2, stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in));
|
||||
ASSERT_EQ (3, ledger.cache.cemented_count);
|
||||
}
|
||||
|
|
|
@ -124,7 +124,7 @@ void nano::active_transactions::block_cemented_callback (std::shared_ptr<nano::b
|
|||
auto transaction = node.store.tx_begin_read ();
|
||||
|
||||
boost::optional<nano::election_status_type> election_status_type;
|
||||
if (!confirmation_height_processor.is_processing_block (block_a->hash ()))
|
||||
if (!confirmation_height_processor.is_processing_added_block (block_a->hash ()))
|
||||
{
|
||||
election_status_type = confirm_block (transaction, block_a);
|
||||
}
|
||||
|
|
|
@ -209,19 +209,24 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (co
|
|||
return composite;
|
||||
}
|
||||
|
||||
size_t nano::confirmation_height_processor::awaiting_processing_size ()
|
||||
size_t nano::confirmation_height_processor::awaiting_processing_size () const
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
return awaiting_processing.size ();
|
||||
}
|
||||
|
||||
bool nano::confirmation_height_processor::is_processing_block (nano::block_hash const & hash_a)
|
||||
bool nano::confirmation_height_processor::is_processing_added_block (nano::block_hash const & hash_a) const
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
return original_hashes_pending.count (hash_a) > 0 || awaiting_processing.get<tag_hash> ().count (hash_a) > 0;
|
||||
}
|
||||
|
||||
nano::block_hash nano::confirmation_height_processor::current ()
|
||||
bool nano::confirmation_height_processor::is_processing_block (nano::block_hash const & hash_a) const
|
||||
{
|
||||
return is_processing_added_block (hash_a) || unbounded_processor.has_iterated_over_block (hash_a);
|
||||
}
|
||||
|
||||
nano::block_hash nano::confirmation_height_processor::current () const
|
||||
{
|
||||
nano::lock_guard<std::mutex> lk (mutex);
|
||||
return original_hash;
|
||||
|
|
|
@ -37,15 +37,16 @@ public:
|
|||
void stop ();
|
||||
void add (nano::block_hash const & hash_a);
|
||||
void run (confirmation_height_mode);
|
||||
size_t awaiting_processing_size ();
|
||||
bool is_processing_block (nano::block_hash const &);
|
||||
nano::block_hash current ();
|
||||
size_t awaiting_processing_size () const;
|
||||
bool is_processing_added_block (nano::block_hash const & hash_a) const;
|
||||
bool is_processing_block (nano::block_hash const &) const;
|
||||
nano::block_hash current () const;
|
||||
|
||||
void add_cemented_observer (std::function<void(std::shared_ptr<nano::block>)> const &);
|
||||
void add_block_already_cemented_observer (std::function<void(nano::block_hash const &)> const &);
|
||||
|
||||
private:
|
||||
std::mutex mutex;
|
||||
mutable std::mutex mutex;
|
||||
// Hashes which have been added to the confirmation height processor, but not yet processed
|
||||
// clang-format off
|
||||
class tag_sequence {};
|
||||
|
|
|
@ -379,8 +379,9 @@ void nano::confirmation_height_unbounded::cement_blocks (nano::write_guard & sco
|
|||
// Reverse it so that the callbacks start from the lowest newly cemented block and move upwards
|
||||
std::reverse (pending.block_callback_data.begin (), pending.block_callback_data.end ());
|
||||
|
||||
nano::lock_guard<std::mutex> guard (block_cache_mutex);
|
||||
std::transform (pending.block_callback_data.begin (), pending.block_callback_data.end (), std::back_inserter (cemented_blocks), [& block_cache = block_cache](auto const & hash_a) {
|
||||
debug_assert (block_cache.find (hash_a) != block_cache.end ());
|
||||
debug_assert (block_cache.count (hash_a) == 1);
|
||||
return block_cache.at (hash_a);
|
||||
});
|
||||
}
|
||||
|
@ -413,6 +414,7 @@ void nano::confirmation_height_unbounded::cement_blocks (nano::write_guard & sco
|
|||
|
||||
std::shared_ptr<nano::block> nano::confirmation_height_unbounded::get_block_and_sideband (nano::block_hash const & hash_a, nano::transaction const & transaction_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (block_cache_mutex);
|
||||
auto block_cache_it = block_cache.find (hash_a);
|
||||
if (block_cache_it != block_cache.cend ())
|
||||
{
|
||||
|
@ -422,7 +424,6 @@ std::shared_ptr<nano::block> nano::confirmation_height_unbounded::get_block_and_
|
|||
{
|
||||
auto block (ledger.store.block_get (transaction_a, hash_a));
|
||||
block_cache.emplace (hash_a, block);
|
||||
++block_cache_size;
|
||||
return block;
|
||||
}
|
||||
}
|
||||
|
@ -440,8 +441,22 @@ void nano::confirmation_height_unbounded::clear_process_vars ()
|
|||
confirmed_iterated_pairs_size = 0;
|
||||
implicit_receive_cemented_mapping.clear ();
|
||||
implicit_receive_cemented_mapping_size = 0;
|
||||
block_cache.clear ();
|
||||
block_cache_size = 0;
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (block_cache_mutex);
|
||||
block_cache.clear ();
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::confirmation_height_unbounded::has_iterated_over_block (nano::block_hash const & hash_a) const
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (block_cache_mutex);
|
||||
return block_cache.count (hash_a) == 1;
|
||||
}
|
||||
|
||||
uint64_t nano::confirmation_height_unbounded::block_cache_size () const
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (block_cache_mutex);
|
||||
return block_cache.size ();
|
||||
}
|
||||
|
||||
nano::confirmation_height_unbounded::conf_height_details::conf_height_details (nano::account const & account_a, nano::block_hash const & hash_a, uint64_t height_a, uint64_t num_blocks_confirmed_a, std::vector<nano::block_hash> const & block_callback_data_a) :
|
||||
|
@ -471,6 +486,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (co
|
|||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "confirmed_iterated_pairs", confirmation_height_unbounded.confirmed_iterated_pairs_size, sizeof (decltype (confirmation_height_unbounded.confirmed_iterated_pairs)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pending_writes", confirmation_height_unbounded.pending_writes_size, sizeof (decltype (confirmation_height_unbounded.pending_writes)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "implicit_receive_cemented_mapping", confirmation_height_unbounded.implicit_receive_cemented_mapping_size, sizeof (decltype (confirmation_height_unbounded.implicit_receive_cemented_mapping)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "block_cache", confirmation_height_unbounded.block_cache_size, sizeof (decltype (confirmation_height_unbounded.block_cache)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "block_cache", confirmation_height_unbounded.block_cache_size (), sizeof (decltype (confirmation_height_unbounded.block_cache)::value_type) }));
|
||||
return composite;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ public:
|
|||
void clear_process_vars ();
|
||||
void process ();
|
||||
void cement_blocks (nano::write_guard &);
|
||||
bool has_iterated_over_block (nano::block_hash const &) const;
|
||||
|
||||
private:
|
||||
class confirmed_iterated_pair
|
||||
|
@ -62,14 +63,16 @@ private:
|
|||
// This allows the load and stores to use relaxed atomic memory ordering.
|
||||
std::unordered_map<account, confirmed_iterated_pair> confirmed_iterated_pairs;
|
||||
nano::relaxed_atomic_integral<uint64_t> confirmed_iterated_pairs_size{ 0 };
|
||||
std::unordered_map<nano::block_hash, std::shared_ptr<nano::block>> block_cache;
|
||||
nano::relaxed_atomic_integral<uint64_t> block_cache_size{ 0 };
|
||||
std::shared_ptr<nano::block> get_block_and_sideband (nano::block_hash const &, nano::transaction const &);
|
||||
std::deque<conf_height_details> pending_writes;
|
||||
nano::relaxed_atomic_integral<uint64_t> pending_writes_size{ 0 };
|
||||
std::unordered_map<nano::block_hash, std::weak_ptr<conf_height_details>> implicit_receive_cemented_mapping;
|
||||
nano::relaxed_atomic_integral<uint64_t> implicit_receive_cemented_mapping_size{ 0 };
|
||||
|
||||
mutable std::mutex block_cache_mutex;
|
||||
std::unordered_map<nano::block_hash, std::shared_ptr<nano::block>> block_cache;
|
||||
uint64_t block_cache_size () const;
|
||||
|
||||
nano::timer<std::chrono::milliseconds> timer;
|
||||
|
||||
class preparation_data final
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue