Remove atomic enum election::state_m and instead require locking election::mutex. (#4318)

* Remove atomic enum election::state_m and instead require locking election::mutex.

* Fix AEC insert race condition

* Move evaluation of election->confirmed () within debug_assert so it doesn't need to be evaluated when debugging is off.

* Removing erasing recently_confirmed

* Fix unit test rep_crawler.recently_confirmed

* Remove active_transactions::insert_impl and remove the passing of mutex

A mutex was needlessly being passed from function to function and a
function wrapper existed needlessly.

* Return early, if stopped, to remove a large level of indentation.

* Break up long complex statement into 2 statements

* Refactor function to remove unnecessary lock/unlocks and make it readable

The critical section is now obvious and easy to see

---------

Co-authored-by: Piotr Wójcik <3044353+pwojcikdev@users.noreply.github.com>
Co-authored-by: Dimitrios Siganos <dimitris@siganos.org>
This commit is contained in:
clemahieu 2024-01-16 17:35:28 +00:00 committed by GitHub
commit 978020e0f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 120 additions and 106 deletions

View file

@ -4088,6 +4088,10 @@ TEST (node, deferred_dependent_elections)
}
}
// This test checks that if a block is in the recently_confirmed list then the repcrawler will not send a request for it.
// The behaviour of this test previously was the opposite, that the repcrawler eventually send out such a block and deleted the block
// from the recently confirmed list to try to make ammends for sending it, which is bad behaviour.
// In the long term, we should have a better way to check for reps and this test should become redundant
TEST (rep_crawler, recently_confirmed)
{
nano::test::system system (1);
@ -4099,8 +4103,8 @@ TEST (rep_crawler, recently_confirmed)
system.wallet (1)->insert_adhoc (nano::dev::genesis_key.prv);
auto channel = node1.network.find_node_id (node2.get_node_id ());
ASSERT_NE (nullptr, channel);
node1.rep_crawler.query (channel);
ASSERT_TIMELY (3s, node1.rep_crawler.representative_count () == 1);
node1.rep_crawler.query (channel); // this query should be dropped due to the recently_confirmed entry
ASSERT_ALWAYS_EQ (0.5s, node1.rep_crawler.representative_count (), 0);
}
namespace nano

View file

@ -311,6 +311,7 @@ void nano::active_transactions::cleanup_election (nano::unique_lock<nano::mutex>
{
debug_assert (!mutex.try_lock ());
debug_assert (lock_a.owns_lock ());
debug_assert (!election->confirmed () || recently_confirmed.exists (election->qualified_root));
node.stats.inc (completion_type (*election), nano::to_stat_detail (election->behavior ()));
// Keep track of election count by election type
@ -404,16 +405,6 @@ void nano::active_transactions::request_loop ()
}
}
nano::election_insertion_result nano::active_transactions::insert (const std::shared_ptr<nano::block> & block, nano::election_behavior behavior)
{
debug_assert (block != nullptr);
nano::unique_lock<nano::mutex> lock{ mutex };
auto result = insert_impl (lock, block, behavior);
return result;
}
void nano::active_transactions::trim ()
{
/*
@ -428,61 +419,65 @@ void nano::active_transactions::trim ()
}
}
nano::election_insertion_result nano::active_transactions::insert_impl (nano::unique_lock<nano::mutex> & lock_a, std::shared_ptr<nano::block> const & block_a, nano::election_behavior election_behavior_a, std::function<void (std::shared_ptr<nano::block> const &)> const & confirmation_action_a)
nano::election_insertion_result nano::active_transactions::insert (std::shared_ptr<nano::block> const & block_a, nano::election_behavior election_behavior_a)
{
debug_assert (!mutex.try_lock ());
debug_assert (lock_a.owns_lock ());
nano::unique_lock<nano::mutex> lock{ mutex };
debug_assert (block_a);
debug_assert (block_a->has_sideband ());
nano::election_insertion_result result;
if (!stopped)
{
auto root (block_a->qualified_root ());
auto existing (roots.get<tag_root> ().find (root));
if (existing == roots.get<tag_root> ().end ())
{
if (!recently_confirmed.exists (root))
{
result.inserted = true;
auto hash (block_a->hash ());
result.election = nano::make_shared<nano::election> (
node, block_a, confirmation_action_a, [&node = node] (auto const & rep_a) {
// Representative is defined as online if replying to live votes or rep_crawler queries
node.online_reps.observe (rep_a);
},
election_behavior_a);
roots.get<tag_root> ().emplace (nano::active_transactions::conflict_info{ root, result.election });
blocks.emplace (hash, result.election);
// Keep track of election count by election type
debug_assert (count_by_behavior[result.election->behavior ()] >= 0);
count_by_behavior[result.election->behavior ()]++;
lock_a.unlock ();
if (auto const cache = node.vote_cache.find (hash); cache)
{
cache->fill (result.election);
}
node.stats.inc (nano::stat::type::active_started, nano::to_stat_detail (election_behavior_a));
node.observers.active_started.notify (hash);
vacancy_update ();
}
if (stopped)
return result;
auto const root = block_a->qualified_root ();
auto const hash = block_a->hash ();
auto const existing = roots.get<tag_root> ().find (root);
if (existing == roots.get<tag_root> ().end ())
{
if (!recently_confirmed.exists (root))
{
result.inserted = true;
auto observe_rep_cb = [&node = node] (auto const & rep_a) {
// Representative is defined as online if replying to live votes or rep_crawler queries
node.online_reps.observe (rep_a);
};
result.election = nano::make_shared<nano::election> (node, block_a, nullptr, observe_rep_cb, election_behavior_a);
roots.get<tag_root> ().emplace (nano::active_transactions::conflict_info{ root, result.election });
blocks.emplace (hash, result.election);
// Keep track of election count by election type
debug_assert (count_by_behavior[result.election->behavior ()] >= 0);
count_by_behavior[result.election->behavior ()]++;
}
else
{
result.election = existing->election;
// result is not set
}
if (lock_a.owns_lock ())
{
lock_a.unlock ();
}
// Votes are generated for inserted or ongoing elections
if (result.election)
{
result.election->broadcast_vote ();
}
trim ();
}
else
{
result.election = existing->election;
}
lock.unlock (); // end of critical section
if (result.inserted)
{
if (auto const cache = node.vote_cache.find (hash); cache)
{
cache->fill (result.election);
}
node.stats.inc (nano::stat::type::active_started, nano::to_stat_detail (election_behavior_a));
node.observers.active_started.notify (hash);
vacancy_update ();
}
// Votes are generated for inserted or ongoing elections
if (result.election)
{
result.election->broadcast_vote ();
}
trim ();
return result;
}

View file

@ -139,7 +139,7 @@ public:
/**
* Starts new election with a specified behavior type
*/
nano::election_insertion_result insert (std::shared_ptr<nano::block> const & block, nano::election_behavior behavior = nano::election_behavior::normal);
nano::election_insertion_result insert (std::shared_ptr<nano::block> const &, nano::election_behavior = nano::election_behavior::normal);
// Distinguishes replay votes, cannot be determined if the block is not in any election
nano::vote_code vote (std::shared_ptr<nano::vote> const &);
// Is the root of this block in the roots container
@ -181,8 +181,6 @@ public:
private:
// Erase elections if we're over capacity
void trim ();
// Call action with confirmed block, may be different than what we started with
nano::election_insertion_result insert_impl (nano::unique_lock<nano::mutex> &, std::shared_ptr<nano::block> const &, nano::election_behavior = nano::election_behavior::normal, std::function<void (std::shared_ptr<nano::block> const &)> const & = nullptr);
void request_loop ();
void request_confirm (nano::unique_lock<nano::mutex> &);
void erase (nano::qualified_root const &);

View file

@ -35,9 +35,12 @@ nano::election::election (nano::node & node_a, std::shared_ptr<nano::block> cons
void nano::election::confirm_once (nano::unique_lock<nano::mutex> & lock_a, nano::election_status_type type_a)
{
debug_assert (lock_a.owns_lock ());
// This must be kept above the setting of election state, as dependent confirmed elections require up to date changes to election_winner_details
nano::unique_lock<nano::mutex> election_winners_lk{ node.active.election_winner_details_mutex };
if (state_m.exchange (nano::election::state_t::confirmed) != nano::election::state_t::confirmed && (node.active.election_winner_details.count (status.winner->hash ()) == 0))
auto just_confirmed = state_m != nano::election::state_t::confirmed;
state_m = nano::election::state_t::confirmed;
if (just_confirmed && (node.active.election_winner_details.count (status.winner->hash ()) == 0))
{
node.active.election_winner_details.emplace (status.winner->hash (), shared_from_this ());
election_winners_lk.unlock ();
@ -48,6 +51,9 @@ void nano::election::confirm_once (nano::unique_lock<nano::mutex> & lock_a, nano
status.voter_count = nano::narrow_cast<decltype (status.voter_count)> (last_votes.size ());
status.type = type_a;
auto const status_l = status;
node.active.recently_confirmed.put (qualified_root, status_l.winner->hash ());
lock_a.unlock ();
node.background ([node_l = node.shared (), status_l, confirmation_action_l = confirmation_action] () {
@ -115,8 +121,9 @@ bool nano::election::state_change (nano::election::state_t expected_a, nano::ele
bool result = true;
if (valid_change (expected_a, desired_a))
{
if (state_m.compare_exchange_strong (expected_a, desired_a))
if (state_m == expected_a)
{
state_m = desired_a;
state_start = std::chrono::steady_clock::now ().time_since_epoch ();
result = false;
}
@ -142,7 +149,6 @@ void nano::election::send_confirm_req (nano::confirmation_solicitor & solicitor_
{
if (confirm_req_time () < (std::chrono::steady_clock::now () - last_req))
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (!solicitor_a.add (*this))
{
last_req = std::chrono::steady_clock::now ();
@ -153,16 +159,25 @@ void nano::election::send_confirm_req (nano::confirmation_solicitor & solicitor_
void nano::election::transition_active ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
state_change (nano::election::state_t::passive, nano::election::state_t::active);
}
bool nano::election::confirmed_locked (nano::unique_lock<nano::mutex> & lock) const
{
debug_assert (lock.owns_lock ());
return state_m == nano::election::state_t::confirmed || state_m == nano::election::state_t::expired_confirmed;
}
bool nano::election::confirmed () const
{
return state_m == nano::election::state_t::confirmed || state_m == nano::election::state_t::expired_confirmed;
nano::unique_lock<nano::mutex> lock{ mutex };
return confirmed_locked (lock);
}
bool nano::election::failed () const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return state_m == nano::election::state_t::expired_unconfirmed;
}
@ -170,7 +185,6 @@ void nano::election::broadcast_block (nano::confirmation_solicitor & solicitor_a
{
if (base_latency () * 15 < std::chrono::steady_clock::now () - last_block)
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (!solicitor_a.broadcast (*this))
{
last_block = std::chrono::steady_clock::now ();
@ -181,11 +195,7 @@ void nano::election::broadcast_block (nano::confirmation_solicitor & solicitor_a
void nano::election::broadcast_vote ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
if (last_vote + std::chrono::milliseconds (node.config.network_params.network.vote_broadcast_interval) < std::chrono::steady_clock::now ())
{
broadcast_vote_impl ();
last_vote = std::chrono::steady_clock::now ();
}
broadcast_vote_locked (lock);
}
nano::vote_info nano::election::get_last_vote (nano::account const & account)
@ -208,17 +218,18 @@ nano::election_status nano::election::get_status () const
bool nano::election::transition_time (nano::confirmation_solicitor & solicitor_a)
{
nano::unique_lock<nano::mutex> lock{ mutex };
bool result = false;
switch (state_m)
{
case nano::election::state_t::passive:
if (base_latency () * passive_duration_factor < std::chrono::steady_clock::now ().time_since_epoch () - state_start.load ())
if (base_latency () * passive_duration_factor < std::chrono::steady_clock::now ().time_since_epoch () - state_start)
{
state_change (nano::election::state_t::passive, nano::election::state_t::active);
}
break;
case nano::election::state_t::active:
broadcast_vote ();
broadcast_vote_locked (lock);
broadcast_block (solicitor_a);
send_confirm_req (solicitor_a);
break;
@ -232,12 +243,11 @@ bool nano::election::transition_time (nano::confirmation_solicitor & solicitor_a
break;
}
if (!confirmed () && time_to_live () < std::chrono::steady_clock::now () - election_start)
if (!confirmed_locked (lock) && time_to_live () < std::chrono::steady_clock::now () - election_start)
{
nano::lock_guard<nano::mutex> guard{ mutex };
// It is possible the election confirmed while acquiring the mutex
// state_change returning true would indicate it
if (!state_change (state_m.load (), nano::election::state_t::expired_unconfirmed))
if (!state_change (state_m, nano::election::state_t::expired_unconfirmed))
{
result = true; // Return true to indicate this election should be cleaned up
if (node.config.logging.election_expiration_tally_logging ())
@ -382,7 +392,7 @@ boost::optional<nano::election_status_type> nano::election::try_confirm (nano::b
if (winner && winner->hash () == hash)
{
// Determine if the block was confirmed explicitly via election confirmation or implicitly via confirmation height
if (!confirmed ())
if (!confirmed_locked (election_lock))
{
confirm_once (election_lock, nano::election_status_type::active_confirmation_height);
status_type = nano::election_status_type::active_confirmation_height;
@ -484,7 +494,7 @@ nano::election_vote_result nano::election::vote (nano::account const & rep, uint
node.stats.inc (nano::stat::type::election, vote_source_a == vote_source::live ? nano::stat::detail::vote_new : nano::stat::detail::vote_cached);
if (!confirmed ())
if (!confirmed_locked (lock))
{
confirm_if_quorum (lock);
}
@ -496,7 +506,7 @@ bool nano::election::publish (std::shared_ptr<nano::block> const & block_a)
nano::unique_lock<nano::mutex> lock{ mutex };
// Do not insert new blocks if already confirmed
auto result (confirmed ());
auto result (confirmed_locked (lock));
if (!result && last_blocks.size () >= max_blocks && last_blocks.find (block_a->hash ()) == last_blocks.end ())
{
if (!replace_by_weight (lock, block_a->hash ()))
@ -549,15 +559,20 @@ std::shared_ptr<nano::block> nano::election::winner () const
return status.winner;
}
void nano::election::broadcast_vote_impl ()
void nano::election::broadcast_vote_locked (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (!mutex.try_lock ());
debug_assert (lock.owns_lock ());
if (std::chrono::steady_clock::now () < last_vote + std::chrono::milliseconds (node.config.network_params.network.vote_broadcast_interval))
{
return;
}
last_vote = std::chrono::steady_clock::now ();
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
node.stats.inc (nano::stat::type::election, nano::stat::detail::generate_vote);
if (confirmed () || have_quorum (tally_impl ()))
if (confirmed_locked (lock) || have_quorum (tally_impl ()))
{
node.stats.inc (nano::stat::type::election, nano::stat::detail::generate_vote_final);
node.final_generator.add (root, status.winner->hash ()); // Broadcasts vote to the network

View file

@ -96,10 +96,9 @@ private: // State management
static unsigned constexpr passive_duration_factor = 5;
static unsigned constexpr active_request_count_min = 2;
std::atomic<nano::election::state_t> state_m = { state_t::passive };
nano::election::state_t state_m = { state_t::passive };
static_assert (std::is_trivial<std::chrono::steady_clock::duration> ());
std::atomic<std::chrono::steady_clock::duration> state_start{ std::chrono::steady_clock::now ().time_since_epoch () };
std::chrono::steady_clock::duration state_start{ std::chrono::steady_clock::now ().time_since_epoch () };
// These are modified while not holding the mutex from transition_time only
std::chrono::steady_clock::time_point last_block = { std::chrono::steady_clock::now () };
@ -164,6 +163,7 @@ public: // Information
private:
nano::tally_t tally_impl () const;
bool confirmed_locked (nano::unique_lock<nano::mutex> & lock) const;
// lock_a does not own the mutex on return
void confirm_once (nano::unique_lock<nano::mutex> & lock_a, nano::election_status_type = nano::election_status_type::active_confirmed_quorum);
void broadcast_block (nano::confirmation_solicitor &);
@ -172,7 +172,7 @@ private:
* Broadcast vote for current election winner. Generates final vote if reached quorum or already confirmed
* Requires mutex lock
*/
void broadcast_vote_impl ();
void broadcast_vote_locked (nano::unique_lock<nano::mutex> & lock);
void remove_votes (nano::block_hash const &);
void remove_block (nano::block_hash const &);
bool replace_by_weight (nano::unique_lock<nano::mutex> & lock_a, nano::block_hash const &);

View file

@ -323,17 +323,13 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
});
observers.vote.add ([this] (std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> const & channel_a, nano::vote_code code_a) {
debug_assert (code_a != nano::vote_code::invalid);
// The vote_code::vote is handled inside the election
if (code_a == nano::vote_code::indeterminate)
auto active_in_rep_crawler (!this->rep_crawler.response (channel_a, vote_a));
if (active_in_rep_crawler)
{
auto active_in_rep_crawler (!this->rep_crawler.response (channel_a, vote_a));
if (active_in_rep_crawler)
{
// Representative is defined as online if replying to live votes or rep_crawler queries
this->online_reps.observe (vote_a->account);
}
this->gap_cache.vote (vote_a);
// Representative is defined as online if replying to live votes or rep_crawler queries
this->online_reps.observe (vote_a->account);
}
this->gap_cache.vote (vote_a);
});
// Cancelling local work generation
@ -1386,7 +1382,6 @@ void nano::node::process_confirmed (nano::election_status const & status_a, uint
decltype (iteration_a) const num_iters = (config.block_processor_batch_max_time / network_params.node.process_confirmed_interval) * 4;
if (auto block_l = ledger.store.block.get (ledger.store.tx_begin_read (), hash))
{
active.recently_confirmed.put (block_l->qualified_root (), hash);
confirmation_height_processor.add (block_l);
}
else if (iteration_a < num_iters)

View file

@ -155,34 +155,41 @@ std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::get_cr
void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::channel>> const & channels_a)
{
auto transaction (node.store.tx_begin_read ());
auto hash_root (node.ledger.hash_root_random (transaction));
std::optional<std::pair<nano::block_hash, nano::block_hash>> hash_root;
for (auto i = 0; i < 4 && !hash_root; ++i)
{
hash_root = node.ledger.hash_root_random (transaction);
if (node.active.recently_confirmed.exists (hash_root->first))
{
hash_root = std::nullopt;
}
}
if (!hash_root)
{
return;
}
{
nano::lock_guard<nano::mutex> lock{ active_mutex };
// Don't send same block multiple times in tests
if (node.network_params.network.is_dev_network ())
{
for (auto i (0); active.count (hash_root.first) != 0 && i < 4; ++i)
for (auto i (0); active.count (hash_root->first) != 0 && i < 4; ++i)
{
hash_root = node.ledger.hash_root_random (transaction);
}
}
active.insert (hash_root.first);
}
if (!channels_a.empty ())
{
// In case our random block is a recently confirmed one, we remove an entry otherwise votes will be marked as replay and not forwarded to repcrawler
node.active.recently_confirmed.erase (hash_root.first);
active.insert (hash_root->first);
}
for (auto i (channels_a.begin ()), n (channels_a.end ()); i != n; ++i)
{
debug_assert (*i != nullptr);
on_rep_request (*i);
node.network.send_confirm_req (*i, hash_root);
node.network.send_confirm_req (*i, *hash_root);
}
// A representative must respond with a vote within the deadline
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash = hash_root.first] () {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash = hash_root->first] () {
if (auto node_l = node_w.lock ())
{
auto target_finished_processed (node_l->vote_processor.total_processed + node_l->vote_processor.size ());