Votes from local representatives should not be flooded on processing (#2766)

* Add representatives cache in wallets, use in websocket and ipc

* Don't broadcast a processed vote from a local representative

* Revert changes to wallets::exists, add a test to ensure it can't be broken in the future (Serg review)
This commit is contained in:
Guilherme Lawless 2020-05-11 09:48:37 +01:00 committed by GitHub
commit 0f61e41a7f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 175 additions and 29 deletions

View file

@ -598,7 +598,7 @@ TEST (system, generate_send_new)
std::vector<nano::account> accounts;
accounts.push_back (nano::test_genesis_key.pub);
// This indirectly waits for online weight to stabilize, required to prevent intermittent failures
ASSERT_TIMELY (5s, node1.wallets.rep_counts ().voting > 0);
ASSERT_TIMELY (5s, node1.wallets.reps ().voting > 0);
system.generate_send_new (node1, accounts);
nano::account new_account (0);
{

View file

@ -1879,7 +1879,7 @@ TEST (node, rep_self_vote)
}
system.wallet (0)->insert_adhoc (rep_big.prv);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
ASSERT_EQ (system.wallet (0)->wallets.rep_counts ().voting, 2);
ASSERT_EQ (system.wallet (0)->wallets.reps ().voting, 2);
auto block0 (std::make_shared<nano::send_block> (node0->latest (nano::test_genesis_key.pub), rep_big.pub, nano::uint128_t ("0x60000000000000000000000000000000"), nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
node0->work_generate_blocking (*block0);
ASSERT_EQ (nano::process_result::progress, node0->process (*block0).code);
@ -2848,7 +2848,7 @@ TEST (node, local_votes_cache_size)
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
wallet.insert_adhoc (nano::keypair ().prv);
ASSERT_EQ (2, node.wallets.rep_counts ().voting);
ASSERT_EQ (2, node.wallets.reps ().voting);
auto transaction (node.store.tx_begin_read ());
auto vote1 (node.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, { nano::genesis_hash }));
nano::block_hash hash (1);

View file

@ -182,3 +182,108 @@ TEST (vote_processor, weights)
ASSERT_NE (node.vote_processor.representatives_3.end (), node.vote_processor.representatives_3.find (nano::test_genesis_key.pub));
}
}
TEST (vote_processor, no_broadcast_local)
{
nano::system system;
nano::node_flags flags;
flags.disable_request_loop = true;
auto & node (*system.add_node (flags));
system.add_node (flags);
nano::block_builder builder;
std::error_code ec;
// Reduce the weight of genesis to 2x default min voting weight
nano::keypair key;
std::shared_ptr<nano::block> send = builder.state ()
.account (nano::test_genesis_key.pub)
.representative (nano::test_genesis_key.pub)
.previous (nano::genesis_hash)
.balance (2 * node.config.vote_minimum.number ())
.link (key.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*system.work.generate (nano::genesis_hash))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
ASSERT_EQ (2 * node.config.vote_minimum.number (), node.weight (nano::test_genesis_key.pub));
// Insert account in wallet
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
node.wallets.compute_reps ();
ASSERT_TRUE (node.wallets.reps ().exists (nano::test_genesis_key.pub));
ASSERT_FALSE (node.wallets.reps ().have_half_rep ());
// Process a vote
auto vote (node.store.vote_generate (node.store.tx_begin_read (), nano::test_genesis_key.pub, nano::test_genesis_key.prv, { send->hash () }));
ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote));
// Make sure the vote was processed
auto election (node.active.election (send->qualified_root ()));
ASSERT_NE (nullptr, election);
auto existing (election->last_votes.find (nano::test_genesis_key.pub));
ASSERT_NE (election->last_votes.end (), existing);
ASSERT_EQ (vote->sequence, existing->second.sequence);
// Ensure the vote, from a local representative, was not broadcast on processing - it should be flooded on generation instead
ASSERT_EQ (0, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out));
// Repeat test with no representative
// Erase account from the wallet
system.wallet (0)->store.erase (node.wallets.tx_begin_write (), nano::test_genesis_key.pub);
node.wallets.compute_reps ();
ASSERT_FALSE (node.wallets.reps ().exists (nano::test_genesis_key.pub));
std::shared_ptr<nano::block> send2 = builder.state ()
.account (nano::test_genesis_key.pub)
.representative (nano::test_genesis_key.pub)
.previous (send->hash ())
.balance (node.config.vote_minimum)
.link (key.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*system.work.generate (send->hash ()))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_EQ (nano::process_result::progress, node.process_local (send2).code);
ASSERT_EQ (node.config.vote_minimum, node.weight (nano::test_genesis_key.pub));
// Process a vote
auto vote2 (node.store.vote_generate (node.store.tx_begin_read (), nano::test_genesis_key.pub, nano::test_genesis_key.prv, { send2->hash () }));
ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote2));
// Make sure the vote was processed
auto election2 (node.active.election (send2->qualified_root ()));
ASSERT_NE (nullptr, election2);
auto existing2 (election2->last_votes.find (nano::test_genesis_key.pub));
ASSERT_NE (election2->last_votes.end (), existing2);
ASSERT_EQ (vote2->sequence, existing2->second.sequence);
// Ensure the vote was broadcast
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out));
// Repeat test with a PR in the wallet
// Increase the genesis weight again
std::shared_ptr<nano::block> open = builder.state ()
.account (key.pub)
.representative (nano::test_genesis_key.pub)
.previous (0)
.balance (nano::genesis_amount - 2 * node.config.vote_minimum.number ())
.link (send->hash ())
.sign (key.prv, key.pub)
.work (*system.work.generate (key.pub))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_EQ (nano::process_result::progress, node.process_local (open).code);
ASSERT_EQ (nano::genesis_amount - node.config.vote_minimum.number (), node.weight (nano::test_genesis_key.pub));
// Insert account in wallet
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
node.wallets.compute_reps ();
ASSERT_TRUE (node.wallets.reps ().exists (nano::test_genesis_key.pub));
ASSERT_TRUE (node.wallets.reps ().have_half_rep ());
// Process a vote
auto vote3 (node.store.vote_generate (node.store.tx_begin_read (), nano::test_genesis_key.pub, nano::test_genesis_key.prv, { open->hash () }));
ASSERT_EQ (nano::vote_code::vote, node.active.vote (vote3));
// Make sure the vote was processed
auto election3 (node.active.election (open->qualified_root ()));
ASSERT_NE (nullptr, election3);
auto existing3 (election3->last_votes.find (nano::test_genesis_key.pub));
ASSERT_NE (election3->last_votes.end (), existing3);
ASSERT_EQ (vote3->sequence, existing3->second.sequence);
// Ensure the vote wass not broadcasst
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out));
}

View file

@ -1561,7 +1561,7 @@ TEST (wallet, foreach_representative_deadlock)
auto & node (*system.nodes[0]);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
node.wallets.compute_reps ();
ASSERT_EQ (1, node.wallets.rep_counts ().voting);
ASSERT_EQ (1, node.wallets.reps ().voting);
node.wallets.foreach_representative ([&node](nano::public_key const & pub, nano::raw_key const & prv) {
if (node.wallets.mutex.try_lock ())
{

View file

@ -196,3 +196,28 @@ TEST (wallets, vote_minimum)
node1.wallets.compute_reps ();
ASSERT_EQ (2, wallet->representatives.size ());
}
TEST (wallets, exists)
{
nano::system system (1);
auto & node (*system.nodes[0]);
nano::keypair key1;
nano::keypair key2;
{
auto transaction (node.wallets.tx_begin_read ());
ASSERT_FALSE (node.wallets.exists (transaction, key1.pub));
ASSERT_FALSE (node.wallets.exists (transaction, key2.pub));
}
system.wallet (0)->insert_adhoc (key1.prv);
{
auto transaction (node.wallets.tx_begin_read ());
ASSERT_TRUE (node.wallets.exists (transaction, key1.pub));
ASSERT_FALSE (node.wallets.exists (transaction, key2.pub));
}
system.wallet (0)->insert_adhoc (key2.prv);
{
auto transaction (node.wallets.tx_begin_read ());
ASSERT_TRUE (node.wallets.exists (transaction, key1.pub));
ASSERT_TRUE (node.wallets.exists (transaction, key2.pub));
}
}

View file

@ -62,7 +62,7 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano::
<< "Path: " << node->application_path.string () << "\n"
<< "Build Info: " << BUILD_INFO << "\n"
<< "Database backend: " << node->store.vendor_get () << std::endl;
auto voting (node->wallets.rep_counts ().voting);
auto voting (node->wallets.reps ().voting);
if (voting > 1)
{
std::cout << "Voting with more than one representative can limit performance: " << voting << " representatives are configured" << std::endl;

View file

@ -51,7 +51,7 @@ nano::active_transactions::~active_transactions ()
void nano::active_transactions::confirm_prioritized_frontiers (nano::transaction const & transaction_a)
{
// Limit maximum count of elections to start
auto rep_counts (node.wallets.rep_counts ());
auto rep_counts (node.wallets.reps ());
bool representative (node.config.enable_voting && rep_counts.voting > 0);
bool half_princpal_representative (representative && rep_counts.half_principal > 0);
/* Check less frequently for regular nodes in auto mode */
@ -597,9 +597,13 @@ nano::vote_code nano::active_transactions::vote (std::shared_ptr<nano::vote> vot
if (at_least_one)
{
// Republish vote if it is new and the node does not host a principal representative (or close to)
if (processed && !node.wallets.rep_counts ().have_half_rep ())
if (processed)
{
node.network.flood_vote (vote_a, 0.5f);
auto const reps (node.wallets.reps ());
if (!reps.have_half_rep () && !reps.exists (vote_a->account))
{
node.network.flood_vote (vote_a, 0.5f);
}
}
return replay ? nano::vote_code::replay : nano::vote_code::vote;
}

View file

@ -618,7 +618,7 @@ void nano::election::prioritize_election (nano::vote_generator_session & generat
void nano::election::generate_votes (nano::block_hash const & hash_a)
{
if (node.config.enable_voting && node.wallets.rep_counts ().voting > 0)
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
node.active.generator.add (hash_a);
}
@ -626,7 +626,7 @@ void nano::election::generate_votes (nano::block_hash const & hash_a)
void nano::election::remove_votes (nano::block_hash const & hash_a)
{
if (node.config.enable_voting && node.wallets.rep_counts ().voting > 0)
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
// Remove votes from election
auto list_generated_votes (node.votes_cache.find (hash_a));

View file

@ -447,7 +447,7 @@ public:
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::in);
// Don't load nodes with disabled voting
if (node.config.enable_voting && node.wallets.rep_counts ().voting > 0)
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
if (message_a.block != nullptr)
{

View file

@ -390,7 +390,7 @@ node_seq (seq)
{
std::ostringstream stream;
stream << "Voting is enabled, more system resources will be used";
auto voting (wallets.rep_counts ().voting);
auto voting (wallets.reps ().voting);
if (voting > 0)
{
stream << ". " << voting << " representative(s) are configured";

View file

@ -25,7 +25,7 @@ thread ([this]() { run (); })
void nano::request_aggregator::add (std::shared_ptr<nano::transport::channel> & channel_a, std::vector<std::pair<nano::block_hash, nano::root>> const & hashes_roots_a)
{
debug_assert (wallets.rep_counts ().voting > 0);
debug_assert (wallets.reps ().voting > 0);
bool error = true;
auto const endpoint (nano::transport::map_endpoint_to_v6 (channel_a->get_endpoint ()));
nano::unique_lock<std::mutex> lock (mutex);

View file

@ -137,7 +137,7 @@ wallets (wallets_a)
void nano::votes_cache::add (std::shared_ptr<nano::vote> const & vote_a)
{
auto voting (wallets.rep_counts ().voting);
auto voting (wallets.reps ().voting);
if (voting == 0)
{
return;

View file

@ -1841,10 +1841,10 @@ void nano::wallets::clear_send_ids (nano::transaction const & transaction_a)
debug_assert (status == 0);
}
nano::wallet_representative_counts nano::wallets::rep_counts ()
nano::wallet_representatives nano::wallets::reps () const
{
nano::lock_guard<std::mutex> counts_guard (counts_mutex);
return counts;
nano::lock_guard<std::mutex> counts_guard (reps_cache_mutex);
return representatives;
}
bool nano::wallets::check_rep (nano::account const & account_a, nano::uint128_t const & half_principal_weight_a, const bool acquire_lock_a)
@ -1856,13 +1856,14 @@ bool nano::wallets::check_rep (nano::account const & account_a, nano::uint128_t
nano::unique_lock<std::mutex> lock;
if (acquire_lock_a)
{
lock = nano::unique_lock<std::mutex> (counts_mutex);
lock = nano::unique_lock<std::mutex> (reps_cache_mutex);
}
result = true;
++counts.voting;
representatives.accounts.insert (account_a);
++representatives.voting;
if (weight >= half_principal_weight_a)
{
++counts.half_principal;
++representatives.half_principal;
}
}
return result;
@ -1871,8 +1872,8 @@ bool nano::wallets::check_rep (nano::account const & account_a, nano::uint128_t
void nano::wallets::compute_reps ()
{
nano::lock_guard<std::mutex> guard (mutex);
nano::lock_guard<std::mutex> counts_guard (counts_mutex);
counts = { 0, 0 };
nano::lock_guard<std::mutex> counts_guard (reps_cache_mutex);
representatives.clear ();
auto half_principal_weight (node.minimum_principal_weight () / 2);
auto transaction (tx_begin_read ());
for (auto i (items.begin ()), n (items.end ()); i != n; ++i)

View file

@ -183,15 +183,26 @@ public:
std::atomic<bool> stopped;
};
class wallet_representative_counts
class wallet_representatives
{
public:
uint64_t voting{ 0 }; // Representatives with at least the configured minimum voting weight
uint64_t half_principal{ 0 }; // Representatives with at least 50% of principal representative requirements
uint64_t voting{ 0 }; // Number of representatives with at least the configured minimum voting weight
uint64_t half_principal{ 0 }; // Number of representatives with at least 50% of principal representative requirements
std::unordered_set<nano::account> accounts; // Representatives with at least the configured minimum voting weight
bool have_half_rep () const
{
return half_principal > 0;
}
bool exists (nano::account const & rep_a) const
{
return accounts.count (rep_a) > 0;
}
void clear ()
{
voting = 0;
half_principal = 0;
accounts.clear ();
}
};
/**
@ -212,10 +223,10 @@ public:
void do_wallet_actions ();
void queue_wallet_action (nano::uint128_t const &, std::shared_ptr<nano::wallet>, std::function<void(nano::wallet &)> const &);
void foreach_representative (std::function<void(nano::public_key const &, nano::raw_key const &)> const &);
bool exists (nano::transaction const &, nano::public_key const &);
bool exists (nano::transaction const &, nano::account const &);
void stop ();
void clear_send_ids (nano::transaction const &);
nano::wallet_representative_counts rep_counts ();
nano::wallet_representatives reps () const;
bool check_rep (nano::account const &, nano::uint128_t const &, const bool = true);
void compute_reps ();
void ongoing_compute_reps ();
@ -246,8 +257,8 @@ public:
nano::read_transaction tx_begin_read ();
private:
std::mutex counts_mutex;
nano::wallet_representative_counts counts;
mutable std::mutex reps_cache_mutex;
nano::wallet_representatives representatives;
};
std::unique_ptr<container_info_component> collect_container_info (wallets & wallets, const std::string & name);