Reduce active mutex locking with election winner details (#2600)
* Reduce active mutex locking with election winner details * Make sure root is removed before accessing election * Serg comment on existing code
This commit is contained in:
parent
a77c0f1b6e
commit
da9671e0ec
14 changed files with 162 additions and 83 deletions
|
@ -72,6 +72,8 @@ TEST (confirmation_height, single)
|
|||
ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -213,6 +215,8 @@ TEST (confirmation_height, multiple_accounts)
|
|||
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
|
||||
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (10, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
};
|
||||
|
||||
test_mode (nano::confirmation_height_mode::bounded);
|
||||
|
@ -288,6 +292,8 @@ TEST (confirmation_height, gap_bootstrap)
|
|||
ASSERT_EQ (0, node1.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
|
||||
ASSERT_EQ (0, node1.stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (0, node1.stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
|
||||
ASSERT_EQ (0, node1.active.election_winner_details_size ());
|
||||
};
|
||||
|
||||
test_mode (nano::confirmation_height_mode::bounded);
|
||||
|
@ -374,6 +380,8 @@ TEST (confirmation_height, gap_live)
|
|||
ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
|
||||
ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (6, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -460,6 +468,8 @@ TEST (confirmation_height, send_receive_between_2_accounts)
|
|||
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (10, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
ASSERT_EQ (11, node->ledger.cache.cemented_count);
|
||||
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
};
|
||||
|
||||
test_mode (nano::confirmation_height_mode::bounded);
|
||||
|
@ -524,6 +534,7 @@ TEST (confirmation_height, send_receive_self)
|
|||
ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (6, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
ASSERT_EQ (confirmation_height_info.height, node->ledger.cache.cemented_count);
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
};
|
||||
|
||||
test_mode (nano::confirmation_height_mode::bounded);
|
||||
|
@ -635,6 +646,8 @@ TEST (confirmation_height, all_block_types)
|
|||
ASSERT_EQ (15, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (15, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
ASSERT_EQ (16, node->ledger.cache.cemented_count);
|
||||
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
};
|
||||
|
||||
test_mode (nano::confirmation_height_mode::bounded);
|
||||
|
@ -743,6 +756,7 @@ TEST (confirmation_height, observers)
|
|||
ASSERT_EQ (1, node1->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
|
||||
ASSERT_EQ (1, node1->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (1, node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
ASSERT_EQ (0, node1->active.election_winner_details_size ());
|
||||
};
|
||||
|
||||
test_mode (nano::confirmation_height_mode::bounded);
|
||||
|
@ -792,6 +806,7 @@ TEST (confirmation_height, modified_chain)
|
|||
}
|
||||
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block, nano::stat::dir::in));
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
};
|
||||
|
||||
test_mode (nano::confirmation_height_mode::bounded);
|
||||
|
@ -836,6 +851,7 @@ TEST (confirmation_height, pending_observer_callbacks)
|
|||
|
||||
ASSERT_EQ (2, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
|
||||
ASSERT_EQ (2, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
};
|
||||
|
||||
test_mode (nano::confirmation_height_mode::bounded);
|
||||
|
@ -1181,6 +1197,7 @@ TEST (confirmation_height, dependent_election)
|
|||
|
||||
{
|
||||
// The write guard prevents the confirmation height processor doing any writes.
|
||||
// Note: This test could still fail intermittently due to thread scheduling between active and confirmation height.
|
||||
system.deadline_set (10s);
|
||||
auto write_guard = node->write_database_queue.wait (nano::writer::testing);
|
||||
while (!node->write_database_queue.contains (nano::writer::confirmation_height))
|
||||
|
@ -1283,6 +1300,7 @@ TEST (confirmation_height, cemented_gap_below_receive)
|
|||
ASSERT_EQ (9, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
|
||||
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
|
||||
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
ASSERT_EQ (0, node->active.election_winner_details_size ());
|
||||
|
||||
// Check that the order of callbacks is correct
|
||||
std::vector<nano::block_hash> expected_order = { send.hash (), open.hash (), send1.hash (), receive1.hash (), send2.hash (), dummy_send.hash (), receive2.hash (), dummy_send1.hash (), send3.hash (), open1->hash () };
|
||||
|
@ -1368,6 +1386,7 @@ TEST (confirmation_height, cemented_gap_below_no_cache)
|
|||
|
||||
auto transaction = node->store.tx_begin_read ();
|
||||
ASSERT_TRUE (node->ledger.block_confirmed (transaction, open1->hash ()));
|
||||
ASSERT_EQ (node->active.election_winner_details_size (), 0);
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
|
||||
ASSERT_EQ (0, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out));
|
||||
ASSERT_EQ (5, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
|
||||
|
@ -1395,13 +1414,13 @@ TEST (confirmation_height, election_winner_details_clearing)
|
|||
nano::keypair key1;
|
||||
auto send = std::make_shared<nano::send_block> (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest));
|
||||
auto send1 = std::make_shared<nano::send_block> (send->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send->hash ()));
|
||||
nano::send_block send2 (send1->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ()));
|
||||
auto send2 = std::make_shared<nano::send_block> (send1->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ()));
|
||||
|
||||
{
|
||||
auto transaction = node->store.tx_begin_write ();
|
||||
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send).code);
|
||||
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send1).code);
|
||||
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code);
|
||||
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send2).code);
|
||||
}
|
||||
|
||||
add_callback_stats (*node);
|
||||
|
@ -1442,8 +1461,16 @@ TEST (confirmation_height, election_winner_details_clearing)
|
|||
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
|
||||
|
||||
// election_winner_details should get cleared during another batch of cementing, so add another block
|
||||
node->confirmation_height_processor.add (send2.hash ());
|
||||
node->block_confirm (send2);
|
||||
system.deadline_set (10s);
|
||||
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 3)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
// Add an already cemented block with fake election details. It should get removed
|
||||
node->active.add_election_winner_details (send2->hash (), nullptr);
|
||||
node->confirmation_height_processor.add (send2->hash ());
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (node->active.election_winner_details_size () > 0)
|
||||
|
@ -1452,8 +1479,8 @@ TEST (confirmation_height, election_winner_details_clearing)
|
|||
}
|
||||
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
|
||||
ASSERT_EQ (2, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
|
||||
ASSERT_EQ (3, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
|
||||
ASSERT_EQ (2, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
|
||||
ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
|
||||
ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
|
||||
};
|
||||
|
|
|
@ -29,13 +29,13 @@ thread ([this]() {
|
|||
})
|
||||
{
|
||||
// Register a callback which will get called after a block is cemented
|
||||
confirmation_height_processor.add_cemented_observer ([this](nano::block_w_sideband const & callback_data) {
|
||||
this->block_cemented_callback (callback_data.block, callback_data.sideband);
|
||||
confirmation_height_processor.add_cemented_observer ([this](nano::block_w_sideband const & callback_data_a) {
|
||||
this->block_cemented_callback (callback_data_a.block, callback_data_a.sideband);
|
||||
});
|
||||
|
||||
// Register a callback which will get called after a batch of blocks is written and observer calls finished
|
||||
confirmation_height_processor.add_cemented_process_finished_observer ([this]() {
|
||||
this->cemented_batch_finished_callback ();
|
||||
confirmation_height_processor.add_block_already_cemented_observer ([this](nano::block_hash const & hash_a) {
|
||||
this->block_already_cemented_callback (hash_a);
|
||||
});
|
||||
|
||||
debug_assert (min_time_between_requests > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
|
||||
|
@ -153,15 +153,20 @@ void nano::active_transactions::block_cemented_callback (std::shared_ptr<nano::b
|
|||
else
|
||||
{
|
||||
auto hash (block_a->hash ());
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
nano::unique_lock<std::mutex> election_winners_lk (election_winner_details_mutex);
|
||||
auto existing (election_winner_details.find (hash));
|
||||
if (existing != election_winner_details.end ())
|
||||
{
|
||||
auto election = existing->second;
|
||||
election_winner_details.erase (hash);
|
||||
election_winners_lk.unlock ();
|
||||
// Make sure mutex is held before election usage so we know that confirm_once has
|
||||
// finished removing the root from active to avoid any data race.
|
||||
nano::unique_lock<std::mutex> lk (mutex);
|
||||
if (election->confirmed () && !election->stopped && election->status.winner->hash () == hash)
|
||||
{
|
||||
add_confirmed (existing->second->status, block_a->qualified_root ());
|
||||
|
||||
add_confirmed (election->status, block_a->qualified_root ());
|
||||
lk.unlock ();
|
||||
node.receive_confirmed (transaction, block_a, hash);
|
||||
nano::account account (0);
|
||||
nano::uint128_t amount (0);
|
||||
|
@ -180,32 +185,25 @@ void nano::active_transactions::block_cemented_callback (std::shared_ptr<nano::b
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
election_winner_details.erase (hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::active_transactions::cemented_batch_finished_callback ()
|
||||
void nano::active_transactions::add_election_winner_details (nano::block_hash const & hash_a, std::shared_ptr<nano::election> const & election_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (election_winner_details_mutex);
|
||||
election_winner_details.emplace (hash_a, election_a);
|
||||
}
|
||||
|
||||
void nano::active_transactions::block_already_cemented_callback (nano::block_hash const & hash_a)
|
||||
{
|
||||
// Depending on timing there is a situation where the election_winner_details is not reset.
|
||||
// This can happen when a block wins an election, and the block is confirmed + observer
|
||||
// called before the block hash gets added to election_winner_details. If the block is confirmed
|
||||
// callbacks have already been done, so we can safely just remove it.
|
||||
auto transaction = node.store.tx_begin_read ();
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
for (auto it = election_winner_details.begin (); it != election_winner_details.end ();)
|
||||
{
|
||||
if (node.ledger.block_confirmed (transaction, it->first))
|
||||
{
|
||||
it = election_winner_details.erase (it);
|
||||
}
|
||||
else
|
||||
{
|
||||
++it;
|
||||
}
|
||||
}
|
||||
nano::lock_guard<std::mutex> guard (election_winner_details_mutex);
|
||||
election_winner_details.erase (hash_a);
|
||||
}
|
||||
|
||||
void nano::active_transactions::election_escalate (std::shared_ptr<nano::election> & election_l, nano::transaction const & transaction_l, size_t const & roots_size_l)
|
||||
|
@ -1076,7 +1074,7 @@ bool nano::active_transactions::inactive_votes_bootstrap_check (std::vector<nano
|
|||
|
||||
size_t nano::active_transactions::election_winner_details_size ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
nano::lock_guard<std::mutex> guard (election_winner_details_mutex);
|
||||
return election_winner_details.size ();
|
||||
}
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ public:
|
|||
bool publish (std::shared_ptr<nano::block> block_a);
|
||||
boost::optional<nano::election_status_type> confirm_block (nano::transaction const &, std::shared_ptr<nano::block>);
|
||||
void block_cemented_callback (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a);
|
||||
void cemented_batch_finished_callback ();
|
||||
void block_already_cemented_callback (nano::block_hash const &);
|
||||
// clang-format off
|
||||
boost::multi_index_container<nano::conflict_info,
|
||||
mi::indexed_by<
|
||||
|
@ -137,11 +137,14 @@ public:
|
|||
size_t priority_wallet_cementable_frontiers_size ();
|
||||
boost::circular_buffer<double> difficulty_trend ();
|
||||
size_t inactive_votes_cache_size ();
|
||||
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> election_winner_details;
|
||||
size_t election_winner_details_size ();
|
||||
void add_election_winner_details (nano::block_hash const &, std::shared_ptr<nano::election> const &);
|
||||
nano::confirmation_solicitor solicitor;
|
||||
|
||||
private:
|
||||
std::mutex election_winner_details_mutex;
|
||||
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> election_winner_details;
|
||||
|
||||
// Call action with confirmed block, may be different than what we started with
|
||||
// clang-format off
|
||||
std::pair<std::shared_ptr<nano::election>, bool> insert_impl (std::shared_ptr<nano::block>, bool const = false, std::function<void(std::shared_ptr<nano::block>)> const & = [](std::shared_ptr<nano::block>) {});
|
||||
|
@ -208,6 +211,7 @@ private:
|
|||
|
||||
friend class confirmation_height_prioritize_frontiers_Test;
|
||||
friend class confirmation_height_prioritize_frontiers_overwrite_Test;
|
||||
friend std::unique_ptr<container_info_component> collect_container_info (active_transactions &, const std::string &);
|
||||
};
|
||||
|
||||
std::unique_ptr<container_info_component> collect_container_info (active_transactions & active_transactions, const std::string & name);
|
||||
|
|
|
@ -8,7 +8,7 @@
|
|||
|
||||
#include <numeric>
|
||||
|
||||
nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<nano::block_w_sideband> const &)> const & notify_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
|
||||
nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<nano::block_w_sideband> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
|
||||
ledger (ledger_a),
|
||||
write_database_queue (write_database_queue_a),
|
||||
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
|
||||
|
@ -16,6 +16,7 @@ logger (logger_a),
|
|||
stopped (stopped_a),
|
||||
original_hash (original_hash_a),
|
||||
notify_observers_callback (notify_observers_callback_a),
|
||||
notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a),
|
||||
awaiting_processing_size_callback (awaiting_processing_size_callback_a)
|
||||
{
|
||||
}
|
||||
|
@ -57,6 +58,7 @@ void nano::confirmation_height_bounded::process ()
|
|||
boost::circular_buffer_space_optimized<nano::block_hash> checkpoints{ max_items };
|
||||
boost::circular_buffer_space_optimized<receive_source_pair> receive_source_pairs{ max_items };
|
||||
nano::block_hash current;
|
||||
bool first_iter = true;
|
||||
auto transaction (ledger.store.tx_begin_read ());
|
||||
do
|
||||
{
|
||||
|
@ -85,7 +87,13 @@ void nano::confirmation_height_bounded::process ()
|
|||
else
|
||||
{
|
||||
auto error = ledger.store.confirmation_height_get (transaction, account, confirmation_height_info);
|
||||
(void)error;
|
||||
debug_assert (!error);
|
||||
// This block was added to the confirmation height processor but is already confirmed
|
||||
if (first_iter && confirmation_height_info.height >= sideband.height && current == original_hash)
|
||||
{
|
||||
notify_block_already_cemented_observers_callback (original_hash);
|
||||
}
|
||||
}
|
||||
|
||||
auto block_height = sideband.height;
|
||||
|
@ -181,6 +189,7 @@ void nano::confirmation_height_bounded::process ()
|
|||
}
|
||||
}
|
||||
|
||||
first_iter = false;
|
||||
transaction.refresh ();
|
||||
} while ((!receive_source_pairs.empty () || current != original_hash) && !stopped);
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ class write_database_queue;
|
|||
class confirmation_height_bounded final
|
||||
{
|
||||
public:
|
||||
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, std::function<void(std::vector<nano::block_w_sideband> const &)> const &, std::function<uint64_t ()> const &);
|
||||
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, std::function<void(std::vector<nano::block_w_sideband> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
|
||||
bool pending_empty () const;
|
||||
void prepare_new ();
|
||||
void process ();
|
||||
|
@ -115,6 +115,7 @@ private:
|
|||
std::atomic<bool> & stopped;
|
||||
nano::block_hash const & original_hash;
|
||||
std::function<void(std::vector<block_w_sideband> const &)> notify_observers_callback;
|
||||
std::function<void(nano::block_hash const &)> notify_block_already_cemented_observers_callback;
|
||||
std::function<uint64_t ()> awaiting_processing_size_callback;
|
||||
|
||||
friend std::unique_ptr<nano::container_info_component> collect_container_info (confirmation_height_bounded &, const std::string & name_a);
|
||||
|
|
|
@ -15,8 +15,8 @@ nano::confirmation_height_processor::confirmation_height_processor (nano::ledger
|
|||
ledger (ledger_a),
|
||||
write_database_queue (write_database_queue_a),
|
||||
// clang-format off
|
||||
confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this]() { return this->awaiting_processing_size (); }),
|
||||
confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this]() { return this->awaiting_processing_size (); }),
|
||||
confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
|
||||
confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
|
||||
// clang-format on
|
||||
thread ([this, &latch, mode_a]() {
|
||||
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
|
||||
|
@ -88,34 +88,39 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
|
|||
else
|
||||
{
|
||||
auto lock_and_cleanup = [&lk, this]() {
|
||||
for (auto const & observer : cemented_process_finished_observers)
|
||||
{
|
||||
observer ();
|
||||
}
|
||||
lk.lock ();
|
||||
original_hash.clear ();
|
||||
original_hashes_pending.clear ();
|
||||
};
|
||||
|
||||
lk.unlock ();
|
||||
// If there are blocks pending cementing, then make sure we flush out the remaining writes
|
||||
if (!confirmation_height_bounded_processor.pending_empty ())
|
||||
if (!paused)
|
||||
{
|
||||
debug_assert (confirmation_height_unbounded_processor.pending_empty ());
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
confirmation_height_bounded_processor.cement_blocks ();
|
||||
lock_and_cleanup ();
|
||||
}
|
||||
else if (!confirmation_height_unbounded_processor.pending_empty ())
|
||||
{
|
||||
debug_assert (confirmation_height_bounded_processor.pending_empty ());
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
confirmation_height_unbounded_processor.cement_blocks ();
|
||||
lock_and_cleanup ();
|
||||
lk.unlock ();
|
||||
|
||||
// If there are blocks pending cementing, then make sure we flush out the remaining writes
|
||||
if (!confirmation_height_bounded_processor.pending_empty ())
|
||||
{
|
||||
debug_assert (confirmation_height_unbounded_processor.pending_empty ());
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
confirmation_height_bounded_processor.cement_blocks ();
|
||||
lock_and_cleanup ();
|
||||
}
|
||||
else if (!confirmation_height_unbounded_processor.pending_empty ())
|
||||
{
|
||||
debug_assert (confirmation_height_bounded_processor.pending_empty ());
|
||||
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
|
||||
confirmation_height_unbounded_processor.cement_blocks ();
|
||||
lock_and_cleanup ();
|
||||
}
|
||||
else
|
||||
{
|
||||
lock_and_cleanup ();
|
||||
condition.wait (lk);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
lock_and_cleanup ();
|
||||
original_hash.clear ();
|
||||
condition.wait (lk);
|
||||
}
|
||||
}
|
||||
|
@ -125,12 +130,16 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
|
|||
// Pausing only affects processing new blocks, not the current one being processed. Currently only used in tests
|
||||
void nano::confirmation_height_processor::pause ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> lk (mutex);
|
||||
paused = true;
|
||||
}
|
||||
|
||||
void nano::confirmation_height_processor::unpause ()
|
||||
{
|
||||
paused = false;
|
||||
{
|
||||
nano::lock_guard<std::mutex> lk (mutex);
|
||||
paused = false;
|
||||
}
|
||||
condition.notify_one ();
|
||||
}
|
||||
|
||||
|
@ -159,9 +168,9 @@ void nano::confirmation_height_processor::add_cemented_observer (std::function<v
|
|||
}
|
||||
|
||||
// Not thread-safe, only call before this processor has begun cementing
|
||||
void nano::confirmation_height_processor::add_cemented_process_finished_observer (std::function<void()> const & callback_a)
|
||||
void nano::confirmation_height_processor::add_block_already_cemented_observer (std::function<void(nano::block_hash const &)> const & callback_a)
|
||||
{
|
||||
cemented_process_finished_observers.push_back (callback_a);
|
||||
block_already_cemented_observers.push_back (callback_a);
|
||||
}
|
||||
|
||||
void nano::confirmation_height_processor::notify_observers (std::vector<nano::block_w_sideband> const & cemented_blocks)
|
||||
|
@ -175,14 +184,22 @@ void nano::confirmation_height_processor::notify_observers (std::vector<nano::bl
|
|||
}
|
||||
}
|
||||
|
||||
void nano::confirmation_height_processor::notify_observers (nano::block_hash const & hash_already_cemented_a)
|
||||
{
|
||||
for (auto const & observer : block_already_cemented_observers)
|
||||
{
|
||||
observer (hash_already_cemented_a);
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::collect_container_info (confirmation_height_processor & confirmation_height_processor_a, const std::string & name_a)
|
||||
{
|
||||
auto composite = std::make_unique<container_info_composite> (name_a);
|
||||
|
||||
size_t cemented_observers_count = confirmation_height_processor_a.cemented_observers.size ();
|
||||
size_t cemented_process_finished_observer_count = confirmation_height_processor_a.cemented_process_finished_observers.size ();
|
||||
size_t block_already_cemented_observers_count = confirmation_height_processor_a.block_already_cemented_observers.size ();
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "cemented_observers", cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.cemented_observers)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "cemented_process_finished_observers", cemented_process_finished_observer_count, sizeof (decltype (confirmation_height_processor_a.cemented_process_finished_observers)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "block_already_cemented_observers", block_already_cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.block_already_cemented_observers)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "awaiting_processing", confirmation_height_processor_a.awaiting_processing_size (), sizeof (decltype (confirmation_height_processor_a.awaiting_processing)::value_type) }));
|
||||
composite->add_component (collect_container_info (confirmation_height_processor_a.confirmation_height_bounded_processor, "bounded_processor"));
|
||||
composite->add_component (collect_container_info (confirmation_height_processor_a.confirmation_height_unbounded_processor, "unbounded_processor"));
|
||||
|
|
|
@ -36,7 +36,7 @@ public:
|
|||
nano::block_hash current ();
|
||||
|
||||
void add_cemented_observer (std::function<void(block_w_sideband)> const &);
|
||||
void add_cemented_process_finished_observer (std::function<void()> const &);
|
||||
void add_block_already_cemented_observer (std::function<void(nano::block_hash const &)> const &);
|
||||
|
||||
private:
|
||||
std::mutex mutex;
|
||||
|
@ -44,15 +44,15 @@ private:
|
|||
std::unordered_set<nano::block_hash> awaiting_processing;
|
||||
// Hashes which have been added and processed, but have not been cemented
|
||||
std::unordered_set<nano::block_hash> original_hashes_pending;
|
||||
bool paused{ false };
|
||||
|
||||
/** This is the last block popped off the confirmation height pending collection */
|
||||
nano::block_hash original_hash{ 0 };
|
||||
|
||||
nano::condition_variable condition;
|
||||
std::atomic<bool> stopped{ false };
|
||||
std::atomic<bool> paused{ false };
|
||||
std::vector<std::function<void(nano::block_w_sideband)>> cemented_observers;
|
||||
std::vector<std::function<void()>> cemented_process_finished_observers;
|
||||
std::vector<std::function<void(nano::block_hash const &)>> block_already_cemented_observers;
|
||||
|
||||
nano::ledger & ledger;
|
||||
nano::write_database_queue & write_database_queue;
|
||||
|
@ -61,7 +61,8 @@ private:
|
|||
std::thread thread;
|
||||
|
||||
void set_next_hash ();
|
||||
void notify_observers (std::vector<nano::block_w_sideband> const & cemented_blocks);
|
||||
void notify_observers (std::vector<nano::block_w_sideband> const &);
|
||||
void notify_observers (nano::block_hash const &);
|
||||
|
||||
friend std::unique_ptr<container_info_component> collect_container_info (confirmation_height_processor &, const std::string &);
|
||||
friend class confirmation_height_pending_observer_callbacks_Test;
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
|
||||
#include <numeric>
|
||||
|
||||
nano::confirmation_height_unbounded::confirmation_height_unbounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<nano::block_w_sideband> const &)> const & notify_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
|
||||
nano::confirmation_height_unbounded::confirmation_height_unbounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<nano::block_w_sideband> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
|
||||
ledger (ledger_a),
|
||||
write_database_queue (write_database_queue_a),
|
||||
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
|
||||
|
@ -13,6 +13,7 @@ logger (logger_a),
|
|||
stopped (stopped_a),
|
||||
original_hash (original_hash_a),
|
||||
notify_observers_callback (notify_observers_callback_a),
|
||||
notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a),
|
||||
awaiting_processing_size_callback (awaiting_processing_size_callback_a)
|
||||
{
|
||||
}
|
||||
|
@ -26,6 +27,7 @@ void nano::confirmation_height_unbounded::process ()
|
|||
std::vector<receive_source_pair> receive_source_pairs;
|
||||
release_assert (receive_source_pairs.empty ());
|
||||
|
||||
bool first_iter = true;
|
||||
auto read_transaction (ledger.store.tx_begin_read ());
|
||||
|
||||
do
|
||||
|
@ -68,6 +70,12 @@ void nano::confirmation_height_unbounded::process ()
|
|||
nano::confirmation_height_info confirmation_height_info;
|
||||
release_assert (!ledger.store.confirmation_height_get (read_transaction, account, confirmation_height_info));
|
||||
confirmation_height = confirmation_height_info.height;
|
||||
|
||||
// This block was added to the confirmation height processor but is already confirmed
|
||||
if (first_iter && confirmation_height >= block_height && current == original_hash)
|
||||
{
|
||||
notify_block_already_cemented_observers_callback (original_hash);
|
||||
}
|
||||
}
|
||||
auto iterated_height = confirmation_height;
|
||||
if (account_it != confirmed_iterated_pairs.cend () && account_it->second.iterated_height > iterated_height)
|
||||
|
@ -141,6 +149,7 @@ void nano::confirmation_height_unbounded::process ()
|
|||
}
|
||||
}
|
||||
|
||||
first_iter = false;
|
||||
read_transaction.renew ();
|
||||
} while ((!receive_source_pairs.empty () || current != original_hash) && !stopped);
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ class write_database_queue;
|
|||
class confirmation_height_unbounded final
|
||||
{
|
||||
public:
|
||||
confirmation_height_unbounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, std::function<void(std::vector<nano::block_w_sideband> const &)> const &, std::function<uint64_t ()> const &);
|
||||
confirmation_height_unbounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, std::function<void(std::vector<nano::block_w_sideband> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
|
||||
bool pending_empty () const;
|
||||
void prepare_new ();
|
||||
void process ();
|
||||
|
@ -91,6 +91,7 @@ private:
|
|||
std::atomic<bool> & stopped;
|
||||
nano::block_hash const & original_hash;
|
||||
std::function<void(std::vector<nano::block_w_sideband> const &)> notify_observers_callback;
|
||||
std::function<void(nano::block_hash const &)> notify_block_already_cemented_observers_callback;
|
||||
std::function<uint64_t ()> awaiting_processing_size_callback;
|
||||
|
||||
friend std::unique_ptr<nano::container_info_component> collect_container_info (confirmation_height_unbounded &, const std::string & name_a);
|
||||
|
|
|
@ -37,9 +37,15 @@ void nano::election::confirm_once (nano::election_status_type type_a)
|
|||
auto status_l (status);
|
||||
auto node_l (node.shared ());
|
||||
auto confirmation_action_l (confirmation_action);
|
||||
node.active.election_winner_details.emplace (status.winner->hash (), shared_from_this ());
|
||||
node.background ([node_l, status_l, confirmation_action_l]() {
|
||||
node_l->process_confirmed (status_l);
|
||||
auto this_l = shared_from_this ();
|
||||
if (status_l.type == nano::election_status_type::active_confirmation_height)
|
||||
{
|
||||
// Need to add dependent election results here (and not in process_confirmed which is called asynchronously) so that
|
||||
// election winner details can be cleared consistently sucessfully in active_transactions::block_cemented_callback
|
||||
node.active.add_election_winner_details (status_l.winner->hash (), this_l);
|
||||
}
|
||||
node.background ([node_l, status_l, confirmation_action_l, this_l]() {
|
||||
node_l->process_confirmed (status_l, this_l);
|
||||
confirmation_action_l (status_l.winner);
|
||||
});
|
||||
clear_blocks ();
|
||||
|
|
|
@ -1243,26 +1243,31 @@ void nano::node::process_confirmed_data (nano::transaction const & transaction_a
|
|||
}
|
||||
}
|
||||
|
||||
void nano::node::process_confirmed (nano::election_status const & status_a, uint8_t iteration)
|
||||
void nano::node::process_confirmed (nano::election_status const & status_a, std::shared_ptr<nano::election> const & election_a, uint8_t iteration_a)
|
||||
{
|
||||
if (status_a.type == nano::election_status_type::active_confirmed_quorum)
|
||||
{
|
||||
auto block_a (status_a.winner);
|
||||
auto hash (block_a->hash ());
|
||||
auto transaction (store.tx_begin_read ());
|
||||
if (store.block_get (transaction, hash) != nullptr)
|
||||
if (ledger.block_exists (block_a->type (), hash))
|
||||
{
|
||||
// Pausing to prevent this block being processed before adding to election winner details.
|
||||
confirmation_height_processor.pause ();
|
||||
confirmation_height_processor.add (hash);
|
||||
{
|
||||
active.add_election_winner_details (hash, election_a);
|
||||
}
|
||||
confirmation_height_processor.unpause ();
|
||||
}
|
||||
// Limit to 0.5 * 20 = 10 seconds (more than max block_processor::process_batch finish time)
|
||||
else if (iteration < 20)
|
||||
else if (iteration_a < 20)
|
||||
{
|
||||
iteration++;
|
||||
iteration_a++;
|
||||
std::weak_ptr<nano::node> node_w (shared ());
|
||||
alarm.add (std::chrono::steady_clock::now () + network_params.node.process_confirmed_interval, [node_w, status_a, iteration]() {
|
||||
alarm.add (std::chrono::steady_clock::now () + network_params.node.process_confirmed_interval, [node_w, status_a, iteration_a, election_a]() {
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
node_l->process_confirmed (status_a, iteration);
|
||||
node_l->process_confirmed (status_a, election_a, iteration_a);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -103,7 +103,7 @@ public:
|
|||
int store_version ();
|
||||
void receive_confirmed (nano::transaction const &, std::shared_ptr<nano::block>, nano::block_hash const &);
|
||||
void process_confirmed_data (nano::transaction const &, std::shared_ptr<nano::block>, nano::block_hash const &, nano::block_sideband const &, nano::account &, nano::uint128_t &, bool &, nano::account &);
|
||||
void process_confirmed (nano::election_status const &, uint8_t = 0);
|
||||
void process_confirmed (nano::election_status const &, std::shared_ptr<nano::election> const &, uint8_t = 0);
|
||||
void process_active (std::shared_ptr<nano::block>);
|
||||
nano::process_return process (nano::block const &);
|
||||
nano::process_return process_local (std::shared_ptr<nano::block>, bool const = false);
|
||||
|
|
|
@ -780,16 +780,12 @@ nano::block_hash nano::ledger::representative_calculated (nano::transaction cons
|
|||
|
||||
bool nano::ledger::block_exists (nano::block_hash const & hash_a)
|
||||
{
|
||||
auto transaction (store.tx_begin_read ());
|
||||
auto result (store.block_exists (transaction, hash_a));
|
||||
return result;
|
||||
return store.block_exists (store.tx_begin_read (), hash_a);
|
||||
}
|
||||
|
||||
bool nano::ledger::block_exists (nano::block_type type, nano::block_hash const & hash_a)
|
||||
{
|
||||
auto transaction (store.tx_begin_read ());
|
||||
auto result (store.block_exists (transaction, type, hash_a));
|
||||
return result;
|
||||
return store.block_exists (store.tx_begin_read (), type, hash_a);
|
||||
}
|
||||
|
||||
std::string nano::ledger::block_text (char const * hash_a)
|
||||
|
|
|
@ -525,6 +525,7 @@ TEST (confirmation_height, many_accounts_single_confirmation)
|
|||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (node->active.election_winner_details_size (), 0);
|
||||
}
|
||||
|
||||
// Can take up to 10 minutes
|
||||
|
@ -586,6 +587,8 @@ TEST (confirmation_height, many_accounts_many_confirmations)
|
|||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
ASSERT_EQ (node->active.election_winner_details_size (), 0);
|
||||
}
|
||||
|
||||
TEST (confirmation_height, long_chains)
|
||||
|
@ -686,6 +689,7 @@ TEST (confirmation_height, long_chains)
|
|||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (node->active.election_winner_details_size (), 0);
|
||||
}
|
||||
|
||||
TEST (confirmation_height, dynamic_algorithm)
|
||||
|
@ -732,6 +736,7 @@ TEST (confirmation_height, dynamic_algorithm)
|
|||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), num_blocks);
|
||||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_bounded, nano::stat::dir::in), 1);
|
||||
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in), num_blocks - 1);
|
||||
ASSERT_EQ (node->active.election_winner_details_size (), 0);
|
||||
}
|
||||
|
||||
namespace nano
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue