Fix bug when using locally generated votes cache (#4007)
This commit is contained in:
		
					parent
					
						
							
								aa6bd58cfd
							
						
					
				
			
			
				commit
				
					
						9d72243ef7
					
				
			
		
					 3 changed files with 61 additions and 65 deletions
				
			
		| 
						 | 
					@ -222,20 +222,34 @@ TEST (request_aggregator, two_endpoints)
 | 
				
			||||||
	auto dummy_channel1 = std::make_shared<nano::transport::inproc::channel> (node1, node1);
 | 
						auto dummy_channel1 = std::make_shared<nano::transport::inproc::channel> (node1, node1);
 | 
				
			||||||
	auto dummy_channel2 = std::make_shared<nano::transport::inproc::channel> (node2, node2);
 | 
						auto dummy_channel2 = std::make_shared<nano::transport::inproc::channel> (node2, node2);
 | 
				
			||||||
	ASSERT_NE (nano::transport::map_endpoint_to_v6 (dummy_channel1->get_endpoint ()), nano::transport::map_endpoint_to_v6 (dummy_channel2->get_endpoint ()));
 | 
						ASSERT_NE (nano::transport::map_endpoint_to_v6 (dummy_channel1->get_endpoint ()), nano::transport::map_endpoint_to_v6 (dummy_channel2->get_endpoint ()));
 | 
				
			||||||
	// Use the aggregator from node1 only, making requests from both nodes
 | 
					
 | 
				
			||||||
 | 
						// For the first request, aggregator should generate a new vote
 | 
				
			||||||
	node1.aggregator.add (dummy_channel1, request);
 | 
						node1.aggregator.add (dummy_channel1, request);
 | 
				
			||||||
	node1.aggregator.add (dummy_channel2, request);
 | 
						ASSERT_TIMELY (5s, node1.aggregator.empty ());
 | 
				
			||||||
	ASSERT_EQ (2, node1.aggregator.size ());
 | 
					
 | 
				
			||||||
	// For the first request it generates the vote, for the second it uses the generated vote
 | 
						ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
 | 
				
			||||||
	ASSERT_TIMELY (3s, node1.aggregator.empty ());
 | 
					 | 
				
			||||||
	ASSERT_EQ (2, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
 | 
					 | 
				
			||||||
	ASSERT_EQ (0, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
 | 
						ASSERT_EQ (0, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
 | 
				
			||||||
	ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
 | 
					
 | 
				
			||||||
	ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
 | 
						ASSERT_TIMELY_EQ (5s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
 | 
				
			||||||
	ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
 | 
						ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
 | 
				
			||||||
	ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes) + node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_late_hashes));
 | 
						ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
 | 
				
			||||||
	ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes) + node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_late_votes));
 | 
						ASSERT_TIMELY_EQ (3s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
 | 
				
			||||||
	ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote));
 | 
						ASSERT_TIMELY_EQ (3s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
 | 
				
			||||||
 | 
						ASSERT_TIMELY_EQ (3s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// For the second request, aggregator should use the cache
 | 
				
			||||||
 | 
						node1.aggregator.add (dummy_channel1, request);
 | 
				
			||||||
 | 
						ASSERT_TIMELY (5s, node1.aggregator.empty ());
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ASSERT_TIMELY_EQ (5s, 2, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
 | 
				
			||||||
 | 
						ASSERT_EQ (0, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						ASSERT_TIMELY_EQ (5s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
 | 
				
			||||||
 | 
						ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
 | 
				
			||||||
 | 
						ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
 | 
				
			||||||
 | 
						ASSERT_TIMELY_EQ (3s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
 | 
				
			||||||
 | 
						ASSERT_TIMELY_EQ (3s, 1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
 | 
				
			||||||
 | 
						ASSERT_TIMELY_EQ (3s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote));
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
TEST (request_aggregator, split)
 | 
					TEST (request_aggregator, split)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -169,18 +169,30 @@ void nano::request_aggregator::erase_duplicates (std::vector<std::pair<nano::blo
 | 
				
			||||||
std::pair<std::vector<std::shared_ptr<nano::block>>, std::vector<std::shared_ptr<nano::block>>> nano::request_aggregator::aggregate (std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const
 | 
					std::pair<std::vector<std::shared_ptr<nano::block>>, std::vector<std::shared_ptr<nano::block>>> nano::request_aggregator::aggregate (std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	auto transaction (ledger.store.tx_begin_read ());
 | 
						auto transaction (ledger.store.tx_begin_read ());
 | 
				
			||||||
	std::size_t cached_hashes = 0;
 | 
					 | 
				
			||||||
	std::vector<std::shared_ptr<nano::block>> to_generate;
 | 
						std::vector<std::shared_ptr<nano::block>> to_generate;
 | 
				
			||||||
	std::vector<std::shared_ptr<nano::block>> to_generate_final;
 | 
						std::vector<std::shared_ptr<nano::block>> to_generate_final;
 | 
				
			||||||
	std::vector<std::shared_ptr<nano::vote>> cached_votes;
 | 
						std::vector<std::shared_ptr<nano::vote>> cached_votes;
 | 
				
			||||||
 | 
						std::unordered_set<nano::block_hash> cached_hashes;
 | 
				
			||||||
	for (auto const & [hash, root] : requests_a)
 | 
						for (auto const & [hash, root] : requests_a)
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
 | 
							// 0. Hashes already sent
 | 
				
			||||||
 | 
							if (cached_hashes.count (hash) > 0)
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								continue;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// 1. Votes in cache
 | 
							// 1. Votes in cache
 | 
				
			||||||
		auto find_votes (local_votes.votes (root, hash));
 | 
							auto find_votes (local_votes.votes (root, hash));
 | 
				
			||||||
		if (!find_votes.empty ())
 | 
							if (!find_votes.empty ())
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			++cached_hashes;
 | 
								for (auto & found_vote : find_votes)
 | 
				
			||||||
			cached_votes.insert (cached_votes.end (), find_votes.begin (), find_votes.end ());
 | 
								{
 | 
				
			||||||
 | 
									cached_votes.push_back (found_vote);
 | 
				
			||||||
 | 
									for (auto & found_hash : found_vote->hashes)
 | 
				
			||||||
 | 
									{
 | 
				
			||||||
 | 
										cached_hashes.insert (found_hash);
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		else
 | 
							else
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
| 
						 | 
					@ -295,7 +307,7 @@ std::pair<std::vector<std::shared_ptr<nano::block>>, std::vector<std::shared_ptr
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
		reply_action (vote, channel_a);
 | 
							reply_action (vote, channel_a);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes, stat::dir::in, cached_hashes);
 | 
						stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes, stat::dir::in, cached_hashes.size ());
 | 
				
			||||||
	stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_votes, stat::dir::in, cached_votes.size ());
 | 
						stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_votes, stat::dir::in, cached_votes.size ());
 | 
				
			||||||
	return std::make_pair (to_generate, to_generate_final);
 | 
						return std::make_pair (to_generate, to_generate_final);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -186,17 +186,7 @@ nano::vote_generator::~vote_generator ()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void nano::vote_generator::process (nano::write_transaction const & transaction, nano::root const & root_a, nano::block_hash const & hash_a)
 | 
					void nano::vote_generator::process (nano::write_transaction const & transaction, nano::root const & root_a, nano::block_hash const & hash_a)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	auto cached_votes (history.votes (root_a, hash_a, is_final));
 | 
						bool should_vote = false;
 | 
				
			||||||
	if (!cached_votes.empty ())
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		for (auto const & vote : cached_votes)
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			broadcast_action (vote);
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	else
 | 
					 | 
				
			||||||
	{
 | 
					 | 
				
			||||||
		auto should_vote (false);
 | 
					 | 
				
			||||||
	if (is_final)
 | 
						if (is_final)
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
		auto block (ledger.store.block.get (transaction, hash_a));
 | 
							auto block (ledger.store.block.get (transaction, hash_a));
 | 
				
			||||||
| 
						 | 
					@ -219,7 +209,6 @@ void nano::vote_generator::process (nano::write_transaction const & transaction,
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
void nano::vote_generator::start ()
 | 
					void nano::vote_generator::start ()
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
| 
						 | 
					@ -294,7 +283,7 @@ void nano::vote_generator::set_reply_action (std::function<void (std::shared_ptr
 | 
				
			||||||
void nano::vote_generator::broadcast (nano::unique_lock<nano::mutex> & lock_a)
 | 
					void nano::vote_generator::broadcast (nano::unique_lock<nano::mutex> & lock_a)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	debug_assert (lock_a.owns_lock ());
 | 
						debug_assert (lock_a.owns_lock ());
 | 
				
			||||||
	std::unordered_set<std::shared_ptr<nano::vote>> cached_sent;
 | 
					
 | 
				
			||||||
	std::vector<nano::block_hash> hashes;
 | 
						std::vector<nano::block_hash> hashes;
 | 
				
			||||||
	std::vector<nano::root> roots;
 | 
						std::vector<nano::root> roots;
 | 
				
			||||||
	hashes.reserve (nano::network::confirm_ack_hashes_max);
 | 
						hashes.reserve (nano::network::confirm_ack_hashes_max);
 | 
				
			||||||
| 
						 | 
					@ -302,15 +291,7 @@ void nano::vote_generator::broadcast (nano::unique_lock<nano::mutex> & lock_a)
 | 
				
			||||||
	while (!candidates.empty () && hashes.size () < nano::network::confirm_ack_hashes_max)
 | 
						while (!candidates.empty () && hashes.size () < nano::network::confirm_ack_hashes_max)
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
		auto const & [root, hash] = candidates.front ();
 | 
							auto const & [root, hash] = candidates.front ();
 | 
				
			||||||
		auto cached_votes = history.votes (root, hash, is_final);
 | 
							if (std::find (roots.begin (), roots.end (), root) == roots.end ())
 | 
				
			||||||
		for (auto const & cached_vote : cached_votes)
 | 
					 | 
				
			||||||
		{
 | 
					 | 
				
			||||||
			if (cached_sent.insert (cached_vote).second)
 | 
					 | 
				
			||||||
			{
 | 
					 | 
				
			||||||
				broadcast_action (cached_vote);
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if (cached_votes.empty () && std::find (roots.begin (), roots.end (), root) == roots.end ())
 | 
					 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			if (spacing.votable (root, hash))
 | 
								if (spacing.votable (root, hash))
 | 
				
			||||||
			{
 | 
								{
 | 
				
			||||||
| 
						 | 
					@ -338,7 +319,6 @@ void nano::vote_generator::broadcast (nano::unique_lock<nano::mutex> & lock_a)
 | 
				
			||||||
void nano::vote_generator::reply (nano::unique_lock<nano::mutex> & lock_a, request_t && request_a)
 | 
					void nano::vote_generator::reply (nano::unique_lock<nano::mutex> & lock_a, request_t && request_a)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	lock_a.unlock ();
 | 
						lock_a.unlock ();
 | 
				
			||||||
	std::unordered_set<std::shared_ptr<nano::vote>> cached_sent;
 | 
					 | 
				
			||||||
	auto i (request_a.first.cbegin ());
 | 
						auto i (request_a.first.cbegin ());
 | 
				
			||||||
	auto n (request_a.first.cend ());
 | 
						auto n (request_a.first.cend ());
 | 
				
			||||||
	while (i != n && !stopped)
 | 
						while (i != n && !stopped)
 | 
				
			||||||
| 
						 | 
					@ -350,17 +330,7 @@ void nano::vote_generator::reply (nano::unique_lock<nano::mutex> & lock_a, reque
 | 
				
			||||||
		for (; i != n && hashes.size () < nano::network::confirm_ack_hashes_max; ++i)
 | 
							for (; i != n && hashes.size () < nano::network::confirm_ack_hashes_max; ++i)
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			auto const & [root, hash] = *i;
 | 
								auto const & [root, hash] = *i;
 | 
				
			||||||
			auto cached_votes = history.votes (root, hash, is_final);
 | 
								if (std::find (roots.begin (), roots.end (), root) == roots.end ())
 | 
				
			||||||
			for (auto const & cached_vote : cached_votes)
 | 
					 | 
				
			||||||
			{
 | 
					 | 
				
			||||||
				if (cached_sent.insert (cached_vote).second)
 | 
					 | 
				
			||||||
				{
 | 
					 | 
				
			||||||
					stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_late_hashes, stat::dir::in, cached_vote->hashes.size ());
 | 
					 | 
				
			||||||
					stats.inc (nano::stat::type::requests, nano::stat::detail::requests_cached_late_votes, stat::dir::in);
 | 
					 | 
				
			||||||
					reply_action (cached_vote, request_a.second);
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			if (cached_votes.empty () && std::find (roots.begin (), roots.end (), root) == roots.end ())
 | 
					 | 
				
			||||||
			{
 | 
								{
 | 
				
			||||||
				if (spacing.votable (root, hash))
 | 
									if (spacing.votable (root, hash))
 | 
				
			||||||
				{
 | 
									{
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue