497 lines
		
	
	
		
			No EOL
		
	
	
		
			15 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			497 lines
		
	
	
		
			No EOL
		
	
	
		
			15 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| #include <nano/lib/assert.hpp>
 | |
| #include <nano/lib/interval.hpp>
 | |
| #include <nano/lib/numbers_templ.hpp>
 | |
| #include <nano/lib/thread_roles.hpp>
 | |
| #include <nano/node/network.hpp>
 | |
| #include <nano/node/rep_tiers.hpp>
 | |
| #include <nano/node/vote_processor.hpp>
 | |
| #include <nano/node/vote_rebroadcaster.hpp>
 | |
| #include <nano/node/vote_router.hpp>
 | |
| #include <nano/node/wallet.hpp>
 | |
| #include <nano/secure/ledger.hpp>
 | |
| #include <nano/secure/vote.hpp>
 | |
| 
 | |
| nano::vote_rebroadcaster::vote_rebroadcaster (nano::vote_rebroadcaster_config const & config_a, nano::ledger & ledger_a, nano::vote_router & vote_router_a, nano::network & network_a, nano::wallets & wallets_a, nano::rep_tiers & rep_tiers_a, nano::stats & stats_a, nano::logger & logger_a) :
 | |
| 	config{ config_a },
 | |
| 	ledger{ ledger_a },
 | |
| 	vote_router{ vote_router_a },
 | |
| 	network{ network_a },
 | |
| 	wallets{ wallets_a },
 | |
| 	rep_tiers{ rep_tiers_a },
 | |
| 	stats{ stats_a },
 | |
| 	logger{ logger_a },
 | |
| 	rebroadcasts{ config }
 | |
| {
 | |
| 	if (!config.enable)
 | |
| 	{
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	queue.max_size_query = [this] (auto const & origin) {
 | |
| 		switch (origin.source)
 | |
| 		{
 | |
| 			case nano::rep_tier::tier_3:
 | |
| 			case nano::rep_tier::tier_2:
 | |
| 			case nano::rep_tier::tier_1:
 | |
| 				return config.max_queue;
 | |
| 			case nano::rep_tier::none:
 | |
| 				return size_t{ 0 };
 | |
| 		}
 | |
| 		debug_assert (false);
 | |
| 		return size_t{ 0 };
 | |
| 	};
 | |
| 
 | |
| 	queue.priority_query = [this] (auto const & origin) {
 | |
| 		switch (origin.source)
 | |
| 		{
 | |
| 			case nano::rep_tier::tier_3:
 | |
| 				return config.priority_coefficient * config.priority_coefficient * config.priority_coefficient;
 | |
| 			case nano::rep_tier::tier_2:
 | |
| 				return config.priority_coefficient * config.priority_coefficient;
 | |
| 			case nano::rep_tier::tier_1:
 | |
| 				return config.priority_coefficient;
 | |
| 			case nano::rep_tier::none:
 | |
| 				return size_t{ 0 };
 | |
| 		}
 | |
| 		debug_assert (false);
 | |
| 		return size_t{ 0 };
 | |
| 	};
 | |
| 
 | |
| 	vote_router.vote_processed.add ([this] (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> const & results) {
 | |
| 		// We also want to allow late votes to be rebroadcasted to help with reaching quorum for other nodes
 | |
| 		bool should_rebroadcast = std::any_of (results.begin (), results.end (), [&] (auto const & result) {
 | |
| 			auto const code = result.second;
 | |
| 			if (code == nano::vote_code::vote)
 | |
| 			{
 | |
| 				return true; // Rebroadcast votes that were processed by active elections
 | |
| 			}
 | |
| 			if (code != nano::vote_code::indeterminate)
 | |
| 			{
 | |
| 				return vote->is_final (); // Rebroadcast late votes only if they are final
 | |
| 			}
 | |
| 			return false;
 | |
| 		});
 | |
| 
 | |
| 		// Enable vote rebroadcasting only if the node does not host a representative
 | |
| 		// Do not rebroadcast votes from non-principal representatives
 | |
| 		if (should_rebroadcast && non_principal)
 | |
| 		{
 | |
| 			auto tier = rep_tiers.tier (vote->account);
 | |
| 			if (tier != nano::rep_tier::none)
 | |
| 			{
 | |
| 				push (vote, tier);
 | |
| 			}
 | |
| 		}
 | |
| 	});
 | |
| }
 | |
| 
 | |
| nano::vote_rebroadcaster::~vote_rebroadcaster ()
 | |
| {
 | |
| 	debug_assert (!thread.joinable ());
 | |
| }
 | |
| 
 | |
| void nano::vote_rebroadcaster::start ()
 | |
| {
 | |
| 	debug_assert (!thread.joinable ());
 | |
| 
 | |
| 	if (!config.enable)
 | |
| 	{
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	thread = std::thread ([this] () {
 | |
| 		nano::thread_role::set (nano::thread_role::name::vote_rebroadcasting);
 | |
| 		run ();
 | |
| 	});
 | |
| }
 | |
| 
 | |
| void nano::vote_rebroadcaster::stop ()
 | |
| {
 | |
| 	{
 | |
| 		std::lock_guard guard{ mutex };
 | |
| 		stopped = true;
 | |
| 	}
 | |
| 	condition.notify_all ();
 | |
| 	if (thread.joinable ())
 | |
| 	{
 | |
| 		thread.join ();
 | |
| 	}
 | |
| }
 | |
| 
 | |
| bool nano::vote_rebroadcaster::push (std::shared_ptr<nano::vote> const & vote, nano::rep_tier tier)
 | |
| {
 | |
| 	bool added = false;
 | |
| 	{
 | |
| 		std::lock_guard guard{ mutex };
 | |
| 
 | |
| 		// Do not rebroadcast local representative votes
 | |
| 		if (!reps.exists (vote->account) && !queue_hashes.contains (vote->signature))
 | |
| 		{
 | |
| 			added = queue.push (vote, tier);
 | |
| 			if (added)
 | |
| 			{
 | |
| 				queue_hashes.insert (vote->signature); // Keep track of vote signatures to avoid duplicates
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	if (added)
 | |
| 	{
 | |
| 		stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::queued);
 | |
| 		condition.notify_one ();
 | |
| 	}
 | |
| 	return added;
 | |
| }
 | |
| 
 | |
| std::pair<std::shared_ptr<nano::vote>, nano::rep_tier> nano::vote_rebroadcaster::next ()
 | |
| {
 | |
| 	debug_assert (!mutex.try_lock ());
 | |
| 	debug_assert (!queue.empty ());
 | |
| 
 | |
| 	queue.periodic_update ();
 | |
| 
 | |
| 	auto [vote, origin] = queue.next ();
 | |
| 	release_assert (vote != nullptr);
 | |
| 	release_assert (origin.source != nano::rep_tier::none);
 | |
| 
 | |
| 	auto erased = queue_hashes.erase (vote->signature);
 | |
| 	debug_assert (erased == 1);
 | |
| 
 | |
| 	return { vote, origin.source };
 | |
| }
 | |
| 
 | |
| void nano::vote_rebroadcaster::run ()
 | |
| {
 | |
| 	std::unique_lock lock{ mutex };
 | |
| 	while (!stopped)
 | |
| 	{
 | |
| 		condition.wait (lock, [&] {
 | |
| 			return stopped || !queue.empty ();
 | |
| 		});
 | |
| 
 | |
| 		if (stopped)
 | |
| 		{
 | |
| 			return;
 | |
| 		}
 | |
| 
 | |
| 		stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::loop);
 | |
| 
 | |
| 		// Update local reps cache
 | |
| 		if (refresh_interval.elapse (nano::is_dev_run () ? 1s : 15s))
 | |
| 		{
 | |
| 			stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::refresh);
 | |
| 
 | |
| 			reps = wallets.reps ();
 | |
| 			non_principal = !reps.have_half_rep (); // Disable vote rebroadcasting if the node has a principal representative (or close to)
 | |
| 		}
 | |
| 
 | |
| 		// Cleanup expired representatives from rebroadcasts
 | |
| 		if (cleanup_interval.elapse (nano::is_dev_run () ? 1s : 60s))
 | |
| 		{
 | |
| 			lock.unlock ();
 | |
| 			cleanup ();
 | |
| 			lock.lock ();
 | |
| 		}
 | |
| 
 | |
| 		float constexpr network_fanout_scale = 1.0f;
 | |
| 
 | |
| 		// Wait for spare capacity if our network traffic is too high
 | |
| 		if (!network.check_capacity (nano::transport::traffic_type::vote_rebroadcast, network_fanout_scale))
 | |
| 		{
 | |
| 			stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::cooldown);
 | |
| 			lock.unlock ();
 | |
| 			std::this_thread::sleep_for (100ms);
 | |
| 			lock.lock ();
 | |
| 			continue; // Wait for more capacity
 | |
| 		}
 | |
| 
 | |
| 		if (!queue.empty ())
 | |
| 		{
 | |
| 			// Only log if component is under pressure
 | |
| 			if (queue.size () > nano::queue_warning_threshold () && log_interval.elapse (15s))
 | |
| 			{
 | |
| 				logger.info (nano::log::type::vote_rebroadcaster, "{} votes (tier 3: {}, tier 2: {}, tier 1: {}) in rebroadcast queue",
 | |
| 				queue.size (),
 | |
| 				queue.size ({ nano::rep_tier::tier_3 }),
 | |
| 				queue.size ({ nano::rep_tier::tier_2 }),
 | |
| 				queue.size ({ nano::rep_tier::tier_1 }));
 | |
| 			}
 | |
| 
 | |
| 			auto [vote, tier] = next ();
 | |
| 
 | |
| 			lock.unlock ();
 | |
| 
 | |
| 			bool should_rebroadcast = process (vote);
 | |
| 			if (should_rebroadcast)
 | |
| 			{
 | |
| 				stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast);
 | |
| 				stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast_hashes, vote->hashes.size ());
 | |
| 				stats.inc (nano::stat::type::vote_rebroadcaster_tier, to_stat_detail (tier));
 | |
| 
 | |
| 				auto sent = network.flood_vote_rebroadcasted (vote, network_fanout_scale);
 | |
| 				stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::sent, sent);
 | |
| 			}
 | |
| 
 | |
| 			lock.lock ();
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| bool nano::vote_rebroadcaster::process (std::shared_ptr<nano::vote> const & vote)
 | |
| {
 | |
| 	stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::process);
 | |
| 
 | |
| 	auto rebroadcasts_l = rebroadcasts.lock ();
 | |
| 
 | |
| 	auto result = rebroadcasts_l->check_and_record (vote, ledger.weight (vote->account), std::chrono::steady_clock::now ());
 | |
| 	if (result == nano::vote_rebroadcaster_index::result::ok)
 | |
| 	{
 | |
| 		return true; // Vote qualifies for rebroadcast
 | |
| 	}
 | |
| 	else
 | |
| 	{
 | |
| 		stats.inc (nano::stat::type::vote_rebroadcaster, nano::enum_util::cast<nano::stat::detail> (result));
 | |
| 		return false; // Vote does not qualify for rebroadcast
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void nano::vote_rebroadcaster::cleanup ()
 | |
| {
 | |
| 	stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::cleanup);
 | |
| 
 | |
| 	auto rebroadcasts_l = rebroadcasts.lock ();
 | |
| 
 | |
| 	auto erased_reps = rebroadcasts_l->cleanup ([this] (auto const & rep) {
 | |
| 		auto tier = rep_tiers.tier (rep);
 | |
| 		auto weight = ledger.weight (rep);
 | |
| 		return std::make_pair (tier != nano::rep_tier::none /* keep entry only if principal rep */, weight);
 | |
| 	});
 | |
| 
 | |
| 	stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::representatives_erase_stale, erased_reps);
 | |
| }
 | |
| 
 | |
| nano::container_info nano::vote_rebroadcaster::container_info () const
 | |
| {
 | |
| 	std::lock_guard guard{ mutex };
 | |
| 
 | |
| 	auto rebroadcasts_l = rebroadcasts.lock ();
 | |
| 
 | |
| 	nano::container_info info;
 | |
| 	info.add ("queue", queue.container_info ());
 | |
| 	info.put ("queue_total", queue.size ());
 | |
| 	info.put ("queue_hashes", queue_hashes.size ());
 | |
| 	info.put ("representatives", rebroadcasts_l->representatives_count ());
 | |
| 	info.put ("history", rebroadcasts_l->total_history ());
 | |
| 	info.put ("hashes", rebroadcasts_l->total_hashes ());
 | |
| 	return info;
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * vote_rebroadcaster_index
 | |
|  */
 | |
| 
 | |
| nano::vote_rebroadcaster_index::vote_rebroadcaster_index (nano::vote_rebroadcaster_config const & config_a) :
 | |
| 	config{ config_a }
 | |
| {
 | |
| }
 | |
| 
 | |
| nano::vote_rebroadcaster_index::result nano::vote_rebroadcaster_index::check_and_record (std::shared_ptr<nano::vote> const & vote, nano::uint128_t rep_weight, std::chrono::steady_clock::time_point now)
 | |
| {
 | |
| 	auto const vote_timestamp = vote->timestamp ();
 | |
| 	auto const vote_hash = vote->full_hash ();
 | |
| 
 | |
| 	auto it = index.get<tag_account> ().find (vote->account);
 | |
| 
 | |
| 	// If we don't have a record for this rep, add it
 | |
| 	if (it == index.get<tag_account> ().end ())
 | |
| 	{
 | |
| 		auto should_add = [&, this] () {
 | |
| 			// Under normal conditions the number of principal representatives should be below this limit
 | |
| 			if (index.size () < config.max_representatives)
 | |
| 			{
 | |
| 				return true;
 | |
| 			}
 | |
| 			// However, if we're at capacity, we can still add the rep if it has a higher weight than the lowest weight in the container
 | |
| 			if (auto lowest = index.get<tag_weight> ().begin (); lowest != index.get<tag_weight> ().end ())
 | |
| 			{
 | |
| 				return rep_weight > lowest->weight;
 | |
| 			}
 | |
| 			return false;
 | |
| 		};
 | |
| 
 | |
| 		if (should_add ())
 | |
| 		{
 | |
| 			it = index.get<tag_account> ().emplace (representative_entry{ vote->account, rep_weight }).first;
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			return result::representatives_full;
 | |
| 		}
 | |
| 	}
 | |
| 	release_assert (it != index.get<tag_account> ().end ());
 | |
| 
 | |
| 	auto & history = it->history;
 | |
| 	auto & hashes = it->hashes;
 | |
| 
 | |
| 	// Check if we already rebroadcasted this exact vote (fast lookup by hash)
 | |
| 	if (hashes.get<tag_vote_hash> ().contains (vote_hash))
 | |
| 	{
 | |
| 		return result::already_rebroadcasted;
 | |
| 	}
 | |
| 
 | |
| 	// Check if any of the hashes contained in the vote qualifies for rebroadcasting
 | |
| 	auto check_hash = [&] (auto const & hash) {
 | |
| 		if (auto existing = history.get<tag_block_hash> ().find (hash); existing != history.get<tag_block_hash> ().end ())
 | |
| 		{
 | |
| 			// Always rebroadcast vote if rep switched to a final vote
 | |
| 			if (nano::vote::is_final_timestamp (vote_timestamp) && vote_timestamp > existing->vote_timestamp)
 | |
| 			{
 | |
| 				return true;
 | |
| 			}
 | |
| 			// Otherwise only rebroadcast if sufficient time has passed since the last rebroadcast
 | |
| 			if (existing->timestamp + config.rebroadcast_threshold > now)
 | |
| 			{
 | |
| 				return false; // Not enough (as seen by local clock) time has passed
 | |
| 			}
 | |
| 			if (add_sat (existing->vote_timestamp, static_cast<nano::vote_timestamp> (config.rebroadcast_threshold.count ())) > vote_timestamp)
 | |
| 			{
 | |
| 				return false; // Not enough (as seen by vote timestamp) time has passed
 | |
| 			}
 | |
| 			return true; // Enough time has passed, block hash qualifies for rebroadcast
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			return true; // Block hash not seen before, rebroadcast
 | |
| 		}
 | |
| 	};
 | |
| 
 | |
| 	bool should_rebroadcast = std::any_of (vote->hashes.begin (), vote->hashes.end (), check_hash);
 | |
| 	if (!should_rebroadcast)
 | |
| 	{
 | |
| 		return result::rebroadcast_unnecessary;
 | |
| 	}
 | |
| 
 | |
| 	// Update the history with the new vote info
 | |
| 	for (auto const & hash : vote->hashes)
 | |
| 	{
 | |
| 		if (auto existing = history.get<tag_block_hash> ().find (hash); existing != history.get<tag_block_hash> ().end ())
 | |
| 		{
 | |
| 			history.get<tag_block_hash> ().modify (existing, [&] (auto & entry) {
 | |
| 				entry.vote_timestamp = vote_timestamp;
 | |
| 				entry.timestamp = now;
 | |
| 			});
 | |
| 		}
 | |
| 		else
 | |
| 		{
 | |
| 			history.get<tag_block_hash> ().emplace (rebroadcast_entry{ hash, vote_timestamp, now });
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Also keep track of the vote hash to quickly filter out duplicates
 | |
| 	hashes.push_back (vote_hash);
 | |
| 
 | |
| 	// Keep history and hashes sizes within limits, erase oldest entries
 | |
| 	while (history.size () > config.max_history)
 | |
| 	{
 | |
| 		history.pop_front (); // Remove the oldest entry
 | |
| 	}
 | |
| 	while (hashes.size () > config.max_history)
 | |
| 	{
 | |
| 		hashes.pop_front (); // Remove the oldest entry
 | |
| 	}
 | |
| 
 | |
| 	// Keep representatives index within limits, erase lowest weight entries
 | |
| 	while (!index.empty () && index.size () > config.max_representatives)
 | |
| 	{
 | |
| 		index.get<tag_weight> ().erase (index.get<tag_weight> ().begin ());
 | |
| 	}
 | |
| 
 | |
| 	return result::ok; // Rebroadcast the vote
 | |
| }
 | |
| 
 | |
| size_t nano::vote_rebroadcaster_index::cleanup (rep_query query)
 | |
| {
 | |
| 	// Remove entries for accounts that are no longer principal representatives
 | |
| 	auto erased_reps = erase_if (index, [&] (auto const & entry) {
 | |
| 		auto [should_keep, weight] = query (entry.representative);
 | |
| 		return !should_keep;
 | |
| 	});
 | |
| 
 | |
| 	// Update representative weights
 | |
| 	for (auto it = index.begin (), end = index.end (); it != end; ++it)
 | |
| 	{
 | |
| 		index.modify (it, [&] (auto & entry) {
 | |
| 			auto [tier, weight] = query (entry.representative);
 | |
| 			entry.weight = weight;
 | |
| 		});
 | |
| 	}
 | |
| 
 | |
| 	return erased_reps;
 | |
| }
 | |
| 
 | |
| bool nano::vote_rebroadcaster_index::contains_vote (nano::block_hash const & vote_hash) const
 | |
| {
 | |
| 	return std::any_of (index.begin (), index.end (), [&] (auto const & entry) {
 | |
| 		return entry.hashes.template get<tag_vote_hash> ().contains (vote_hash);
 | |
| 	});
 | |
| }
 | |
| 
 | |
| bool nano::vote_rebroadcaster_index::contains_representative (nano::account const & representative) const
 | |
| {
 | |
| 	return index.get<tag_account> ().contains (representative);
 | |
| }
 | |
| 
 | |
| bool nano::vote_rebroadcaster_index::contains_block (nano::account const & representative, nano::block_hash const & block_hash) const
 | |
| {
 | |
| 	if (auto it = index.get<tag_account> ().find (representative); it != index.get<tag_account> ().end ())
 | |
| 	{
 | |
| 		return it->history.get<tag_block_hash> ().find (block_hash) != it->history.get<tag_block_hash> ().end ();
 | |
| 	}
 | |
| 	return false;
 | |
| }
 | |
| 
 | |
| size_t nano::vote_rebroadcaster_index::representatives_count () const
 | |
| {
 | |
| 	return index.size ();
 | |
| }
 | |
| 
 | |
| size_t nano::vote_rebroadcaster_index::total_history () const
 | |
| {
 | |
| 	return std::accumulate (index.begin (), index.end (), size_t{ 0 }, [] (auto total, auto const & entry) {
 | |
| 		return total + entry.history.size ();
 | |
| 	});
 | |
| }
 | |
| 
 | |
| size_t nano::vote_rebroadcaster_index::total_hashes () const
 | |
| {
 | |
| 	return std::accumulate (index.begin (), index.end (), size_t{ 0 }, [] (auto total, auto const & entry) {
 | |
| 		return total + entry.hashes.size ();
 | |
| 	});
 | |
| }
 | |
| 
 | |
| /*
 | |
|  * vote_rebroadcaster_config
 | |
|  */
 | |
| 
 | |
| nano::error nano::vote_rebroadcaster_config::deserialize (nano::tomlconfig & toml)
 | |
| {
 | |
| 	toml.get ("enable", enable);
 | |
| 	toml.get ("max_queue", max_queue);
 | |
| 	toml.get ("max_history", max_history);
 | |
| 	toml.get ("max_representatives", max_representatives);
 | |
| 	toml.get_duration ("rebroadcast_threshold", rebroadcast_threshold);
 | |
| 	toml.get ("priority_coefficient", priority_coefficient);
 | |
| 
 | |
| 	return toml.get_error ();
 | |
| }
 | |
| 
 | |
| nano::error nano::vote_rebroadcaster_config::serialize (nano::tomlconfig & toml) const
 | |
| {
 | |
| 	toml.put ("enable", enable, "Enable or disable vote rebroadcasting. Disabling it will reduce bandwidth usage but should be done with understanding that the node will not participate fully in network consensus.\ntype:bool");
 | |
| 	toml.put ("max_queue", max_queue, "Maximum number of votes to keep in queue for processing.\ntype:uint64");
 | |
| 	toml.put ("max_history", max_history, "Maximum number of recently broadcast hashes to keep per representative.\ntype:uint64");
 | |
| 	toml.put ("max_representatives", max_representatives, "Maximum number of representatives to track rebroadcasts for.\ntype:uint64");
 | |
| 	toml.put ("rebroadcast_threshold", rebroadcast_threshold.count (), "Minimum amount of time between rebroadcasts for the same hash from the same representative.\ntype:milliseconds");
 | |
| 	toml.put ("priority_coefficient", priority_coefficient, "Priority coefficient for prioritizing votes from representative tiers.\ntype:uint64");
 | |
| 
 | |
| 	return toml.get_error ();
 | |
| } |