Vote processed event

This commit is contained in:
Piotr Wójcik 2024-03-21 11:49:25 +01:00
commit 7d2fbd8026
8 changed files with 78 additions and 41 deletions

View file

@ -365,7 +365,7 @@ TEST (inactive_votes_cache, existing_vote)
ASSERT_EQ (send->hash (), last_vote1.hash);
ASSERT_EQ (nano::vote::timestamp_min * 1, last_vote1.timestamp);
// Attempt to change vote with inactive_votes_cache
node.vote_cache.vote (vote1);
node.vote_cache.insert (vote1);
auto cached = node.vote_cache.find (send->hash ());
ASSERT_EQ (1, cached.size ());
for (auto const & cached_vote : cached)
@ -1535,7 +1535,7 @@ TEST (active_transactions, allow_limited_overflow)
{
// Non-final vote, so it stays in the AEC without getting confirmed
auto vote = nano::test::make_vote (nano::dev::genesis_key, { block });
node.vote_cache.vote (vote);
node.vote_cache.insert (vote);
}
// Ensure active elections overfill AEC only up to normal + hinted limit
@ -1573,7 +1573,7 @@ TEST (active_transactions, allow_limited_overflow_adapt)
{
// Non-final vote, so it stays in the AEC without getting confirmed
auto vote = nano::test::make_vote (nano::dev::genesis_key, { block });
node.vote_cache.vote (vote);
node.vote_cache.insert (vote);
}
// Ensure hinted election amount is bounded by hinted limit

View file

@ -57,7 +57,7 @@ TEST (vote_cache, insert_one_hash)
auto rep1 = create_rep (7);
auto hash1 = nano::test::random_hash ();
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
ASSERT_EQ (1, vote_cache.size ());
auto peek1 = vote_cache.find (hash1);
@ -88,9 +88,9 @@ TEST (vote_cache, insert_one_hash_many_votes)
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
auto vote2 = nano::test::make_vote (rep2, { hash1 }, 2 * 1024 * 1024);
auto vote3 = nano::test::make_vote (rep3, { hash1 }, 3 * 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.vote (vote2);
vote_cache.vote (vote3);
vote_cache.insert (vote1);
vote_cache.insert (vote2);
vote_cache.insert (vote3);
ASSERT_EQ (1, vote_cache.size ());
auto peek1 = vote_cache.find (hash1);
@ -132,9 +132,9 @@ TEST (vote_cache, insert_many_hashes_many_votes)
auto vote3 = nano::test::make_vote (rep3, { hash3 }, 1024 * 1024);
auto vote4 = nano::test::make_vote (rep4, { hash1 }, 1024 * 1024);
// Insert first 3 votes in cache
vote_cache.vote (vote1);
vote_cache.vote (vote2);
vote_cache.vote (vote3);
vote_cache.insert (vote1);
vote_cache.insert (vote2);
vote_cache.insert (vote3);
// Ensure all of those are properly inserted
ASSERT_EQ (3, vote_cache.size ());
ASSERT_EQ (1, vote_cache.find (hash1).size ());
@ -152,7 +152,7 @@ TEST (vote_cache, insert_many_hashes_many_votes)
ASSERT_EQ (peek1.front (), vote3);
// Now add a vote from rep4 with the highest voting weight
vote_cache.vote (vote4);
vote_cache.insert (vote4);
// Ensure that the first entry in queue is now the one for hash1 (rep1 + rep4 tally weight)
auto tops2 = vote_cache.top (0);
@ -195,8 +195,8 @@ TEST (vote_cache, insert_duplicate)
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
auto vote2 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.vote (vote2);
vote_cache.insert (vote1);
vote_cache.insert (vote2);
ASSERT_EQ (1, vote_cache.size ());
}
@ -212,12 +212,12 @@ TEST (vote_cache, insert_newer)
auto hash1 = nano::test::random_hash ();
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
auto peek1 = vote_cache.find (hash1);
ASSERT_EQ (peek1.size (), 1);
ASSERT_EQ (peek1.front (), vote1);
auto vote2 = nano::test::make_final_vote (rep1, { hash1 });
vote_cache.vote (vote2);
vote_cache.insert (vote2);
auto peek2 = vote_cache.find (hash1);
ASSERT_EQ (peek2.size (), 1);
ASSERT_EQ (peek2.front (), vote2); // vote2 should replace vote1 as it has a higher timestamp
@ -235,12 +235,12 @@ TEST (vote_cache, insert_older)
auto hash1 = nano::test::random_hash ();
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 2 * 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
auto peek1 = vote_cache.find (hash1);
ASSERT_EQ (peek1.size (), 1);
ASSERT_EQ (peek1.front (), vote1);
auto vote2 = nano::test::make_vote (rep1, { hash1 }, 1 * 1024 * 1024);
vote_cache.vote (vote2);
vote_cache.insert (vote2);
auto peek2 = vote_cache.find (hash1);
ASSERT_EQ (peek2.size (), 1);
ASSERT_EQ (peek2.front (), vote1); // vote1 should still be in cache as it has a higher timestamp
@ -265,9 +265,9 @@ TEST (vote_cache, erase)
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1024 * 1024);
auto vote2 = nano::test::make_vote (rep2, { hash2 }, 1024 * 1024);
auto vote3 = nano::test::make_vote (rep3, { hash3 }, 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.vote (vote2);
vote_cache.vote (vote3);
vote_cache.insert (vote1);
vote_cache.insert (vote2);
vote_cache.insert (vote3);
ASSERT_EQ (3, vote_cache.size ());
ASSERT_FALSE (vote_cache.empty ());
ASSERT_FALSE (vote_cache.find (hash1).empty ());
@ -304,7 +304,7 @@ TEST (vote_cache, overfill)
auto rep1 = create_rep (count - n);
auto hash1 = nano::test::random_hash ();
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
}
ASSERT_LT (vote_cache.size (), count);
// Check that oldest votes are dropped first
@ -328,7 +328,7 @@ TEST (vote_cache, overfill_entry)
{
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 1024 * 1024);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
}
ASSERT_EQ (1, vote_cache.size ());
}
@ -344,7 +344,7 @@ TEST (vote_cache, age_cutoff)
auto hash1 = nano::test::random_hash ();
auto rep1 = create_rep (9);
auto vote1 = nano::test::make_vote (rep1, { hash1 }, 3);
vote_cache.vote (vote1);
vote_cache.insert (vote1);
ASSERT_EQ (1, vote_cache.size ());
ASSERT_FALSE (vote_cache.find (hash1).empty ());

View file

@ -419,13 +419,9 @@ nano::election_insertion_result nano::active_transactions::insert (std::shared_p
if (result.inserted)
{
release_assert (result.election);
debug_assert (result.election);
auto cached = node.vote_cache.find (hash);
for (auto const & cached_vote : cached)
{
vote (cached_vote);
}
trigger_vote_cache (hash);
node.observers.active_started.notify (hash);
vacancy_update ();
@ -442,8 +438,18 @@ nano::election_insertion_result nano::active_transactions::insert (std::shared_p
return result;
}
bool nano::active_transactions::trigger_vote_cache (nano::block_hash hash)
{
auto cached = node.vote_cache.find (hash);
for (auto const & cached_vote : cached)
{
vote (cached_vote, nano::vote_source::cache);
}
return !cached.empty ();
}
// Validate a vote and apply it to the current election if one exists
std::unordered_map<nano::block_hash, nano::vote_code> nano::active_transactions::vote (std::shared_ptr<nano::vote> const & vote)
std::unordered_map<nano::block_hash, nano::vote_code> nano::active_transactions::vote (std::shared_ptr<nano::vote> const & vote, nano::vote_source source)
{
std::unordered_map<nano::block_hash, nano::vote_code> results;
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> process;
@ -480,7 +486,7 @@ std::unordered_map<nano::block_hash, nano::vote_code> nano::active_transactions:
for (auto const & [block_hash, election] : process)
{
auto const vote_result = election->vote (vote->account, vote->timestamp (), block_hash);
auto const vote_result = election->vote (vote->account, vote->timestamp (), block_hash, source);
results[block_hash] = vote_result;
processed |= (vote_result == nano::vote_code::vote);
@ -502,6 +508,8 @@ std::unordered_map<nano::block_hash, nano::vote_code> nano::active_transactions:
return results.find (hash) != results.end ();
}));
vote_processed.notify (vote, source, results);
return results;
}
@ -612,11 +620,7 @@ bool nano::active_transactions::publish (std::shared_ptr<nano::block> const & bl
blocks.emplace (block_a->hash (), election);
lock.unlock ();
auto cached = node.vote_cache.find (block_a->hash ());
for (auto const & cached_vote : cached)
{
vote (cached_vote);
}
trigger_vote_cache (block_a->hash ());
node.stats.inc (nano::stat::type::active, nano::stat::detail::election_block_conflict);
}

View file

@ -152,7 +152,7 @@ public:
*/
nano::election_insertion_result insert (std::shared_ptr<nano::block> const &, nano::election_behavior = nano::election_behavior::normal);
// Distinguishes replay votes, cannot be determined if the block is not in any election
std::unordered_map<nano::block_hash, nano::vote_code> vote (std::shared_ptr<nano::vote> const &);
std::unordered_map<nano::block_hash, nano::vote_code> vote (std::shared_ptr<nano::vote> const &, nano::vote_source = nano::vote_source::live);
// Is the root of this block in the roots container
bool active (nano::block const &) const;
bool active (nano::qualified_root const &) const;
@ -189,6 +189,10 @@ public:
void add_election_winner_details (nano::block_hash const &, std::shared_ptr<nano::election> const &);
std::shared_ptr<nano::election> remove_election_winner_details (nano::block_hash const &);
public: // Events
using vote_processed_event_t = nano::observer_set<std::shared_ptr<nano::vote> const &, nano::vote_source, std::unordered_map<nano::block_hash, nano::vote_code> const &>;
vote_processed_event_t vote_processed;
private:
// Erase elections if we're over capacity
void trim ();
@ -201,6 +205,7 @@ private:
std::vector<std::shared_ptr<nano::election>> list_active_impl (std::size_t) const;
void activate_successors (nano::store::read_transaction const & transaction, std::shared_ptr<nano::block> const & block);
void notify_observers (nano::store::read_transaction const & transaction, nano::election_status const & status, std::vector<nano::vote_with_weight_info> const & votes);
bool trigger_vote_cache (nano::block_hash);
private: // Dependencies
nano::node & node;

View file

@ -217,6 +217,10 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
scheduler.optimistic.activate (account, account_info, conf_info);
});
active.vote_processed.add ([this] (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> const & results) {
vote_cache.observe (vote, source, results);
});
if (!init_error ())
{
// Notify election schedulers when AEC frees election slot

View file

@ -128,7 +128,25 @@ nano::vote_cache::vote_cache (vote_cache_config const & config_a, nano::stats &
{
}
void nano::vote_cache::vote (std::shared_ptr<nano::vote> const & vote, std::function<bool (nano::block_hash const &)> const & filter)
void nano::vote_cache::observe (const std::shared_ptr<nano::vote> & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> results)
{
if (source == nano::vote_source::live)
{
insert (vote, [&results] (nano::block_hash const & hash) {
// This filters which hashes should be included in the vote cache
if (auto it = results.find (hash); it != results.end ())
{
auto result = it->second;
// Cache votes with a corresponding active election (indicated by `vote_code::vote`) in case that election gets dropped
return result == nano::vote_code::vote || result == nano::vote_code::indeterminate;
}
debug_assert (false);
return false;
});
}
}
void nano::vote_cache::insert (std::shared_ptr<nano::vote> const & vote, std::function<bool (nano::block_hash const &)> filter)
{
auto const representative = vote->account;
auto const timestamp = vote->timestamp ();
@ -138,6 +156,7 @@ void nano::vote_cache::vote (std::shared_ptr<nano::vote> const & vote, std::func
for (auto const & hash : vote->hashes)
{
// Using filter callback here to avoid unnecessary relocking when processing large votes
if (!filter (hash))
{
continue;

View file

@ -106,9 +106,14 @@ public:
/**
* Adds a new vote to cache
*/
void vote (
void insert (
std::shared_ptr<nano::vote> const & vote,
std::function<bool (nano::block_hash const &)> const & filter = [] (nano::block_hash const &) { return true; });
std::function<bool (nano::block_hash const &)> filter = [] (nano::block_hash const &) { return true; });
/**
* Should be called for every processed vote, filters which votes should be added to cache
*/
void observe (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code>);
/**
* Tries to find an entry associated with block hash

View file

@ -176,7 +176,7 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr<nano::vote>
}
result = replay ? nano::vote_code::replay : (processed ? nano::vote_code::vote : nano::vote_code::indeterminate);
observers.vote.notify (vote_a, channel_a, result);
observers.vote.notify (vote, channel, result);
}
stats.inc (nano::stat::type::vote, to_stat_detail (result));