Handle legacy confirm_req using the aggregator (#2541)

* Handle legacy confirm_req using the aggregator

* Fix test node.local_votes_cache

* Adjust test to make sure the cache is used

* Use system polls instead
This commit is contained in:
Guilherme Lawless 2020-02-10 14:21:22 +00:00 committed by GitHub
commit ad0396eef0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 90 deletions

View file

@ -2598,16 +2598,37 @@ TEST (node, local_votes_cache)
nano::confirm_req message1 (send1);
nano::confirm_req message2 (send2);
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.network.process_message (message1, channel);
auto wait_vote_sequence = [&node, &system](unsigned sequence) {
std::shared_ptr<nano::vote> current_vote;
system.deadline_set (5s);
while (current_vote == nullptr || current_vote->sequence < sequence)
{
{
nano::lock_guard<std::mutex> lock (node.store.get_cache_mutex ());
auto transaction (node.store.tx_begin_read ());
current_vote = node.store.vote_current (transaction, nano::test_genesis_key.pub);
}
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (sequence, current_vote->sequence);
};
wait_vote_sequence (1);
node.network.process_message (message2, channel);
wait_vote_sequence (2);
for (auto i (0); i < 100; ++i)
{
node.network.process_message (message1, channel);
node.network.process_message (message2, channel);
}
for (int i = 0; i < 4; ++i)
{
system.poll (node.aggregator.max_delay);
}
// Make sure a new vote was not generated
{
nano::lock_guard<std::mutex> lock (node.store.get_cache_mutex ());
auto transaction (node.store.tx_begin_read ());
auto current_vote (node.store.vote_current (transaction, nano::test_genesis_key.pub));
ASSERT_EQ (current_vote->sequence, 2);
ASSERT_EQ (2, node.store.vote_current (node.store.tx_begin_read (), nano::test_genesis_key.pub)->sequence);
}
// Max cache
{
@ -2619,12 +2640,11 @@ TEST (node, local_votes_cache)
{
node.network.process_message (message3, channel);
}
for (int i = 0; i < 4; ++i)
{
nano::lock_guard<std::mutex> lock (node.store.get_cache_mutex ());
auto transaction (node.store.tx_begin_read ());
auto current_vote (node.store.vote_current (transaction, nano::test_genesis_key.pub));
ASSERT_EQ (current_vote->sequence, 3);
system.poll (node.aggregator.max_delay);
}
wait_vote_sequence (3);
ASSERT_TRUE (node.votes_cache.find (send1->hash ()).empty ());
ASSERT_FALSE (node.votes_cache.find (send2->hash ()).empty ());
ASSERT_FALSE (node.votes_cache.find (send3->hash ()).empty ());

View file

@ -146,76 +146,6 @@ void nano::network::send_node_id_handshake (std::shared_ptr<nano::transport::cha
channel_a->send (message);
}
template <typename T>
bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, T & list_a, std::shared_ptr<nano::block> block_a, bool also_publish)
{
bool result (false);
if (node_a.config.enable_voting && node_a.wallets.rep_counts ().voting > 0)
{
auto hash (block_a->hash ());
// Search in cache
auto votes (node_a.votes_cache.find (hash));
if (votes.empty ())
{
// Generate new vote
node_a.wallets.foreach_representative ([&result, &list_a, &node_a, &transaction_a, &hash](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
result = true;
auto vote (node_a.store.vote_generate (transaction_a, pub_a, prv_a, std::vector<nano::block_hash> (1, hash)));
nano::confirm_ack confirm (vote);
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
{
j->get ()->send (confirm);
}
node_a.votes_cache.add (vote);
});
}
else
{
// Send from cache
for (auto & vote : votes)
{
nano::confirm_ack confirm (vote);
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
{
j->get ()->send (confirm);
}
}
}
// Republish if required
if (also_publish)
{
nano::publish publish (block_a);
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
{
j->get ()->send (publish);
}
}
}
return result;
}
bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, std::shared_ptr<nano::transport::channel> channel_a, std::shared_ptr<nano::block> block_a, bool also_publish)
{
std::array<std::shared_ptr<nano::transport::channel>, 1> endpoints = { channel_a };
auto result (confirm_block (transaction_a, node_a, endpoints, std::move (block_a), also_publish));
return result;
}
bool nano::network::send_votes_cache (std::shared_ptr<nano::transport::channel> channel_a, nano::block_hash const & hash_a)
{
// Search in cache
auto votes (node.votes_cache.find (hash_a));
// Send from cache
for (auto & vote : votes)
{
nano::confirm_ack confirm (vote);
channel_a->send (confirm);
}
// Returns true if votes were sent
bool result (!votes.empty ());
return result;
}
void nano::network::flood_message (nano::message const & message_a, bool const is_droppable_a)
{
for (auto & i : list (fanout ()))
@ -463,17 +393,7 @@ public:
{
if (message_a.block != nullptr)
{
auto hash (message_a.block->hash ());
if (!node.network.send_votes_cache (channel, hash))
{
auto transaction (node.store.tx_begin_read ());
auto successor (node.ledger.successor (transaction, message_a.block->qualified_root ()));
if (successor != nullptr)
{
auto same_block (successor->hash () == hash);
confirm_block (transaction, node, channel, std::move (successor), !same_block);
}
}
node.aggregator.add (channel, { { message_a.block->hash (), message_a.block->root () } });
}
else if (!message_a.roots_hashes.empty ())
{

View file

@ -124,8 +124,6 @@ public:
void broadcast_confirm_req_base (std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>, unsigned, bool = false);
void broadcast_confirm_req_batched_many (std::unordered_map<std::shared_ptr<nano::transport::channel>, std::deque<std::pair<nano::block_hash, nano::root>>>, std::function<void()> = nullptr, unsigned = broadcast_interval_ms, bool = false);
void broadcast_confirm_req_many (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>>, std::function<void()> = nullptr, unsigned = broadcast_interval_ms);
void confirm_hashes (nano::transaction const &, std::shared_ptr<nano::transport::channel>, std::vector<nano::block_hash>);
bool send_votes_cache (std::shared_ptr<nano::transport::channel>, nano::block_hash const &);
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel>);