Overhaul
This commit is contained in:
		
					parent
					
						
							
								d3b5ff8d8c
							
						
					
				
			
			
				commit
				
					
						fe7c2b1776
					
				
			
		
					 13 changed files with 259 additions and 88 deletions
				
			
		| 
						 | 
				
			
			@ -18,11 +18,11 @@ TEST (online_reps, basic)
 | 
			
		|||
	ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.online ());
 | 
			
		||||
	// 1 minimum, 1 maximum
 | 
			
		||||
	ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ());
 | 
			
		||||
	node1.online_reps.sample ();
 | 
			
		||||
	node1.online_reps.force_sample ();
 | 
			
		||||
	ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.trended ());
 | 
			
		||||
	node1.online_reps.clear ();
 | 
			
		||||
	// 2 minimum, 1 maximum
 | 
			
		||||
	node1.online_reps.sample ();
 | 
			
		||||
	node1.online_reps.force_sample ();
 | 
			
		||||
	ASSERT_EQ (node1.config.online_weight_minimum, node1.online_reps.trended ());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -81,6 +81,7 @@ enum class type
 | 
			
		|||
	signal_manager,
 | 
			
		||||
	peer_history,
 | 
			
		||||
	message_processor,
 | 
			
		||||
	online_reps,
 | 
			
		||||
	local_block_broadcaster,
 | 
			
		||||
	monitor,
 | 
			
		||||
	confirming_set,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -112,6 +112,7 @@ enum class type
 | 
			
		|||
	message_processor_overfill,
 | 
			
		||||
	message_processor_type,
 | 
			
		||||
	process_confirmed,
 | 
			
		||||
	online_reps,
 | 
			
		||||
 | 
			
		||||
	_last // Must be the last enum
 | 
			
		||||
};
 | 
			
		||||
| 
						 | 
				
			
			@ -609,6 +610,15 @@ enum class detail
 | 
			
		|||
	rollback_skipped,
 | 
			
		||||
	loop_scan,
 | 
			
		||||
 | 
			
		||||
	// online_reps
 | 
			
		||||
	trim_trend,
 | 
			
		||||
	sanitize_old,
 | 
			
		||||
	sanitize_future,
 | 
			
		||||
	sample,
 | 
			
		||||
	rep_new,
 | 
			
		||||
	rep_update,
 | 
			
		||||
	update_online,
 | 
			
		||||
 | 
			
		||||
	// error codes
 | 
			
		||||
	no_buffer_space,
 | 
			
		||||
	timed_out,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -184,6 +184,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
 | 
			
		|||
		case nano::thread_role::name::vote_router:
 | 
			
		||||
			thread_role_name_string = "Vote router";
 | 
			
		||||
			break;
 | 
			
		||||
		case nano::thread_role::name::online_reps:
 | 
			
		||||
			thread_role_name_string = "Online reps";
 | 
			
		||||
			break;
 | 
			
		||||
		case nano::thread_role::name::monitor:
 | 
			
		||||
			thread_role_name_string = "Monitor";
 | 
			
		||||
			break;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -66,6 +66,7 @@ enum class name
 | 
			
		|||
	port_mapping,
 | 
			
		||||
	stats,
 | 
			
		||||
	vote_router,
 | 
			
		||||
	online_reps,
 | 
			
		||||
	monitor,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -139,7 +139,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
 | 
			
		|||
	rep_crawler (config.rep_crawler, *this),
 | 
			
		||||
	rep_tiers{ ledger, network_params, online_reps, stats, logger },
 | 
			
		||||
	warmed_up (0),
 | 
			
		||||
	online_reps_impl{ std::make_unique<nano::online_reps> (config, ledger) },
 | 
			
		||||
	online_reps_impl{ std::make_unique<nano::online_reps> (config, ledger, stats, logger) },
 | 
			
		||||
	online_reps{ *online_reps_impl },
 | 
			
		||||
	history_impl{ std::make_unique<nano::local_vote_history> (config.network_params.voting) },
 | 
			
		||||
	history{ *history_impl },
 | 
			
		||||
| 
						 | 
				
			
			@ -606,8 +606,6 @@ void nano::node::start ()
 | 
			
		|||
		rep_crawler.start ();
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ongoing_online_weight_calculation_queue ();
 | 
			
		||||
 | 
			
		||||
	bool tcp_enabled = false;
 | 
			
		||||
	if (config.tcp_incoming_connections_max > 0 && !(flags.disable_bootstrap_listener && flags.disable_tcp_realtime))
 | 
			
		||||
	{
 | 
			
		||||
| 
						 | 
				
			
			@ -661,6 +659,7 @@ void nano::node::start ()
 | 
			
		|||
	local_block_broadcaster.start ();
 | 
			
		||||
	peer_history.start ();
 | 
			
		||||
	vote_router.start ();
 | 
			
		||||
	online_reps.start ();
 | 
			
		||||
	monitor.start ();
 | 
			
		||||
 | 
			
		||||
	add_initial_peers ();
 | 
			
		||||
| 
						 | 
				
			
			@ -677,7 +676,7 @@ void nano::node::stop ()
 | 
			
		|||
	logger.info (nano::log::type::node, "Node stopping...");
 | 
			
		||||
 | 
			
		||||
	tcp_listener.stop ();
 | 
			
		||||
 | 
			
		||||
	online_reps.stop ();
 | 
			
		||||
	vote_router.stop ();
 | 
			
		||||
	peer_history.stop ();
 | 
			
		||||
	// Cancels ongoing work generation tasks, which may be blocking other threads
 | 
			
		||||
| 
						 | 
				
			
			@ -1063,28 +1062,11 @@ bool nano::node::block_confirmed_or_being_confirmed (nano::block_hash const & ha
 | 
			
		|||
	return block_confirmed_or_being_confirmed (ledger.tx_begin_read (), hash_a);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::node::ongoing_online_weight_calculation_queue ()
 | 
			
		||||
{
 | 
			
		||||
	std::weak_ptr<nano::node> node_w (shared_from_this ());
 | 
			
		||||
	workers.post_delayed ((std::chrono::seconds (network_params.node.weight_period)), [node_w] () {
 | 
			
		||||
		if (auto node_l = node_w.lock ())
 | 
			
		||||
		{
 | 
			
		||||
			node_l->ongoing_online_weight_calculation ();
 | 
			
		||||
		}
 | 
			
		||||
	});
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool nano::node::online () const
 | 
			
		||||
{
 | 
			
		||||
	return rep_crawler.total_weight () > online_reps.delta ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::node::ongoing_online_weight_calculation ()
 | 
			
		||||
{
 | 
			
		||||
	online_reps.sample ();
 | 
			
		||||
	ongoing_online_weight_calculation_queue ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
std::shared_ptr<nano::node> nano::node::shared ()
 | 
			
		||||
{
 | 
			
		||||
	return shared_from_this ();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -114,8 +114,6 @@ public:
 | 
			
		|||
	bool block_confirmed_or_being_confirmed (nano::block_hash const &);
 | 
			
		||||
 | 
			
		||||
	void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr<std::string> const &, std::shared_ptr<std::string> const &, std::shared_ptr<boost::asio::ip::tcp::resolver> const &);
 | 
			
		||||
	void ongoing_online_weight_calculation ();
 | 
			
		||||
	void ongoing_online_weight_calculation_queue ();
 | 
			
		||||
	bool online () const;
 | 
			
		||||
	bool init_error () const;
 | 
			
		||||
	std::pair<uint64_t, std::unordered_map<nano::account, nano::uint128_t>> get_bootstrap_weights () const;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,106 +1,252 @@
 | 
			
		|||
#include <nano/lib/config.hpp>
 | 
			
		||||
#include <nano/lib/thread_roles.hpp>
 | 
			
		||||
#include <nano/lib/timer.hpp>
 | 
			
		||||
#include <nano/node/nodeconfig.hpp>
 | 
			
		||||
#include <nano/node/online_reps.hpp>
 | 
			
		||||
#include <nano/secure/ledger.hpp>
 | 
			
		||||
#include <nano/store/component.hpp>
 | 
			
		||||
#include <nano/store/online_weight.hpp>
 | 
			
		||||
 | 
			
		||||
nano::online_reps::online_reps (nano::node_config const & config_a, nano::ledger & ledger_a) :
 | 
			
		||||
nano::online_reps::online_reps (nano::node_config const & config_a, nano::ledger & ledger_a, nano::stats & stats_a, nano::logger & logger_a) :
 | 
			
		||||
	config{ config_a },
 | 
			
		||||
	ledger{ ledger_a }
 | 
			
		||||
	ledger{ ledger_a },
 | 
			
		||||
	stats{ stats_a },
 | 
			
		||||
	logger{ logger_a }
 | 
			
		||||
{
 | 
			
		||||
	if (!ledger.store.init_error ())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
nano::online_reps::~online_reps ()
 | 
			
		||||
{
 | 
			
		||||
	debug_assert (!thread.joinable ());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::online_reps::start ()
 | 
			
		||||
{
 | 
			
		||||
	debug_assert (!thread.joinable ());
 | 
			
		||||
 | 
			
		||||
	{
 | 
			
		||||
		auto transaction (ledger.store.tx_begin_read ());
 | 
			
		||||
		trended_m = calculate_trend (transaction);
 | 
			
		||||
		auto transaction = ledger.tx_begin_write (nano::store::writer::online_weight);
 | 
			
		||||
		sanitize_trended (transaction);
 | 
			
		||||
 | 
			
		||||
		auto trended_l = calculate_trended (transaction);
 | 
			
		||||
		nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
		cached_trended = trended_l;
 | 
			
		||||
 | 
			
		||||
		logger.info (nano::log::type::online_reps, "Initial trended weight: {}", fmt::streamed (cached_trended));
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	thread = std::thread ([this] () {
 | 
			
		||||
		nano::thread_role::set (nano::thread_role::name::online_reps);
 | 
			
		||||
		run ();
 | 
			
		||||
	});
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::online_reps::stop ()
 | 
			
		||||
{
 | 
			
		||||
	{
 | 
			
		||||
		nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
		stopped = true;
 | 
			
		||||
	}
 | 
			
		||||
	condition.notify_all ();
 | 
			
		||||
	if (thread.joinable ())
 | 
			
		||||
	{
 | 
			
		||||
		thread.join ();
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::online_reps::observe (nano::account const & rep_a)
 | 
			
		||||
void nano::online_reps::observe (nano::account const & rep)
 | 
			
		||||
{
 | 
			
		||||
	if (ledger.weight (rep_a) > 0)
 | 
			
		||||
	if (ledger.weight (rep) > config.representative_vote_weight_minimum)
 | 
			
		||||
	{
 | 
			
		||||
		nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
 | 
			
		||||
		auto now = std::chrono::steady_clock::now ();
 | 
			
		||||
		auto new_insert = reps.get<tag_account> ().erase (rep_a) == 0;
 | 
			
		||||
		reps.insert ({ now, rep_a });
 | 
			
		||||
		auto cutoff = reps.get<tag_time> ().lower_bound (now - std::chrono::seconds (config.network_params.node.weight_period));
 | 
			
		||||
		auto trimmed = reps.get<tag_time> ().begin () != cutoff;
 | 
			
		||||
		reps.get<tag_time> ().erase (reps.get<tag_time> ().begin (), cutoff);
 | 
			
		||||
		auto new_insert = reps.get<tag_account> ().erase (rep) == 0;
 | 
			
		||||
		reps.insert ({ now, rep });
 | 
			
		||||
 | 
			
		||||
		stats.inc (nano::stat::type::online_reps, new_insert ? nano::stat::detail::rep_new : nano::stat::detail::rep_update);
 | 
			
		||||
 | 
			
		||||
		bool trimmed = trim ();
 | 
			
		||||
 | 
			
		||||
		// Update current online weight if anything changed
 | 
			
		||||
		if (new_insert || trimmed)
 | 
			
		||||
		{
 | 
			
		||||
			online_m = calculate_online ();
 | 
			
		||||
			stats.inc (nano::stat::type::online_reps, nano::stat::detail::update_online);
 | 
			
		||||
			cached_online = calculate_online ();
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool nano::online_reps::trim ()
 | 
			
		||||
{
 | 
			
		||||
	debug_assert (!mutex.try_lock ());
 | 
			
		||||
 | 
			
		||||
	auto now = std::chrono::steady_clock::now ();
 | 
			
		||||
	auto cutoff = reps.get<tag_time> ().lower_bound (now - config.network_params.node.weight_interval);
 | 
			
		||||
	auto trimmed = reps.get<tag_time> ().begin () != cutoff;
 | 
			
		||||
	reps.get<tag_time> ().erase (reps.get<tag_time> ().begin (), cutoff);
 | 
			
		||||
	return trimmed;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::online_reps::run ()
 | 
			
		||||
{
 | 
			
		||||
	nano::unique_lock<nano::mutex> lock{ mutex };
 | 
			
		||||
	while (!stopped)
 | 
			
		||||
	{
 | 
			
		||||
		// Set next time point explicitly to ensure that we don't sample too early
 | 
			
		||||
		auto next = std::chrono::steady_clock::now () + config.network_params.node.weight_interval;
 | 
			
		||||
		condition.wait_until (lock, next, [this, next] {
 | 
			
		||||
			return stopped || std::chrono::steady_clock::now () >= next;
 | 
			
		||||
		});
 | 
			
		||||
		if (!stopped)
 | 
			
		||||
		{
 | 
			
		||||
			lock.unlock ();
 | 
			
		||||
			sample ();
 | 
			
		||||
			lock.lock ();
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::online_reps::sample ()
 | 
			
		||||
{
 | 
			
		||||
	nano::unique_lock<nano::mutex> lock{ mutex };
 | 
			
		||||
	nano::uint128_t online_l = online_m;
 | 
			
		||||
	lock.unlock ();
 | 
			
		||||
	nano::uint128_t trend_l;
 | 
			
		||||
	stats.inc (nano::stat::type::online_reps, nano::stat::detail::sample);
 | 
			
		||||
 | 
			
		||||
	auto transaction = ledger.tx_begin_write (nano::store::writer::online_weight);
 | 
			
		||||
 | 
			
		||||
	// Remove old records from the database
 | 
			
		||||
	trim_trended (transaction);
 | 
			
		||||
 | 
			
		||||
	// Put current online weight sample into the database
 | 
			
		||||
	ledger.store.online_weight.put (transaction, nano::seconds_since_epoch (), online ());
 | 
			
		||||
 | 
			
		||||
	// Update current trended weight
 | 
			
		||||
	auto trended_l = calculate_trended (transaction);
 | 
			
		||||
	{
 | 
			
		||||
		auto transaction = ledger.store.tx_begin_write ();
 | 
			
		||||
		// Discard oldest entries
 | 
			
		||||
		while (ledger.store.online_weight.count (transaction) >= config.network_params.node.max_weight_samples)
 | 
			
		||||
		{
 | 
			
		||||
			auto oldest (ledger.store.online_weight.begin (transaction));
 | 
			
		||||
			debug_assert (oldest != ledger.store.online_weight.end (transaction));
 | 
			
		||||
			ledger.store.online_weight.del (transaction, oldest->first);
 | 
			
		||||
		}
 | 
			
		||||
		ledger.store.online_weight.put (transaction, std::chrono::system_clock::now ().time_since_epoch ().count (), online_l);
 | 
			
		||||
		trend_l = calculate_trend (transaction);
 | 
			
		||||
		nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
		cached_trended = trended_l;
 | 
			
		||||
	}
 | 
			
		||||
	lock.lock ();
 | 
			
		||||
	trended_m = trend_l;
 | 
			
		||||
	logger.info (nano::log::type::online_reps, "Updated trended weight: {}", fmt::streamed (trended_l));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
nano::uint128_t nano::online_reps::calculate_online () const
 | 
			
		||||
{
 | 
			
		||||
	nano::uint128_t current;
 | 
			
		||||
	for (auto & i : reps)
 | 
			
		||||
	{
 | 
			
		||||
		current += ledger.weight (i.account);
 | 
			
		||||
	}
 | 
			
		||||
	return current;
 | 
			
		||||
	debug_assert (!mutex.try_lock ());
 | 
			
		||||
	return std::accumulate (reps.begin (), reps.end (), nano::uint128_t{ 0 }, [this] (nano::uint128_t current, rep_info const & info) {
 | 
			
		||||
		return current + ledger.weight (info.account);
 | 
			
		||||
	});
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
nano::uint128_t nano::online_reps::calculate_trend (store::transaction & transaction_a) const
 | 
			
		||||
void nano::online_reps::trim_trended (nano::store::write_transaction const & transaction)
 | 
			
		||||
{
 | 
			
		||||
	auto const now = std::chrono::system_clock::now ();
 | 
			
		||||
	auto const cutoff = now - config.network_params.node.weight_cutoff;
 | 
			
		||||
 | 
			
		||||
	for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (transaction); ++it)
 | 
			
		||||
	{
 | 
			
		||||
		auto tstamp = nano::from_seconds_since_epoch (it->first);
 | 
			
		||||
		if (tstamp < cutoff)
 | 
			
		||||
		{
 | 
			
		||||
			stats.inc (nano::stat::type::online_reps, nano::stat::detail::trim_trend);
 | 
			
		||||
			ledger.store.online_weight.del (transaction, it->first);
 | 
			
		||||
		}
 | 
			
		||||
		else
 | 
			
		||||
		{
 | 
			
		||||
			// Entries are ordered by timestamp, so break early
 | 
			
		||||
			break;
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Ensure that all remaining entries are within the expected range
 | 
			
		||||
	debug_assert (verify_consistency (transaction, now, cutoff));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::online_reps::sanitize_trended (nano::store::write_transaction const & transaction)
 | 
			
		||||
{
 | 
			
		||||
	auto const now = std::chrono::system_clock::now ();
 | 
			
		||||
	auto const cutoff = now - config.network_params.node.weight_cutoff;
 | 
			
		||||
 | 
			
		||||
	size_t removed_old = 0, removed_future = 0;
 | 
			
		||||
 | 
			
		||||
	for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (transaction); ++it)
 | 
			
		||||
	{
 | 
			
		||||
		auto tstamp = nano::from_seconds_since_epoch (it->first);
 | 
			
		||||
		if (tstamp < cutoff)
 | 
			
		||||
		{
 | 
			
		||||
			stats.inc (nano::stat::type::online_reps, nano::stat::detail::sanitize_old);
 | 
			
		||||
			// TODO: Ensure it's OK to delete entry with the same key as the current iterator
 | 
			
		||||
			ledger.store.online_weight.del (transaction, it->first);
 | 
			
		||||
			++removed_old;
 | 
			
		||||
		}
 | 
			
		||||
		else if (tstamp > now)
 | 
			
		||||
		{
 | 
			
		||||
			stats.inc (nano::stat::type::online_reps, nano::stat::detail::sanitize_future);
 | 
			
		||||
			// TODO: Ensure it's OK to delete entry with the same key as the current iterator
 | 
			
		||||
			ledger.store.online_weight.del (transaction, it->first);
 | 
			
		||||
			++removed_future;
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	logger.debug (nano::log::type::online_reps, "Sanitized online weight trend, remaining entries: {}, removed: {} (old: {}, future: {})",
 | 
			
		||||
	ledger.store.online_weight.count (transaction),
 | 
			
		||||
	removed_old + removed_future,
 | 
			
		||||
	removed_old,
 | 
			
		||||
	removed_future);
 | 
			
		||||
 | 
			
		||||
	// Ensure that all remaining entries are within the expected range
 | 
			
		||||
	debug_assert (verify_consistency (transaction, now, cutoff));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool nano::online_reps::verify_consistency (nano::store::write_transaction const & transaction, std::chrono::system_clock::time_point now, std::chrono::system_clock::time_point cutoff) const
 | 
			
		||||
{
 | 
			
		||||
	for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (transaction); ++it)
 | 
			
		||||
	{
 | 
			
		||||
		auto tstamp = nano::from_seconds_since_epoch (it->first);
 | 
			
		||||
		if (tstamp < cutoff || tstamp > now)
 | 
			
		||||
		{
 | 
			
		||||
			return false;
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
nano::uint128_t nano::online_reps::calculate_trended (nano::store::transaction const & transaction) const
 | 
			
		||||
{
 | 
			
		||||
	std::vector<nano::uint128_t> items;
 | 
			
		||||
	items.reserve (config.network_params.node.max_weight_samples + 1);
 | 
			
		||||
	items.push_back (config.online_weight_minimum.number ());
 | 
			
		||||
	for (auto i (ledger.store.online_weight.begin (transaction_a)), n (ledger.store.online_weight.end (transaction_a)); i != n; ++i)
 | 
			
		||||
	for (auto it = ledger.store.online_weight.begin (transaction); it != ledger.store.online_weight.end (transaction); ++it)
 | 
			
		||||
	{
 | 
			
		||||
		items.push_back (i->second.number ());
 | 
			
		||||
		items.push_back (it->second.number ());
 | 
			
		||||
	}
 | 
			
		||||
	nano::uint128_t result;
 | 
			
		||||
	// Pick median value for our target vote weight
 | 
			
		||||
	auto median_idx = items.size () / 2;
 | 
			
		||||
	nth_element (items.begin (), items.begin () + median_idx, items.end ());
 | 
			
		||||
	result = items[median_idx];
 | 
			
		||||
	return result;
 | 
			
		||||
	if (!items.empty ())
 | 
			
		||||
	{
 | 
			
		||||
		// Pick median value for our target vote weight
 | 
			
		||||
		auto median_idx = items.size () / 2;
 | 
			
		||||
		std::nth_element (items.begin (), items.begin () + median_idx, items.end ());
 | 
			
		||||
		return items[median_idx];
 | 
			
		||||
	}
 | 
			
		||||
	return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
nano::uint128_t nano::online_reps::trended () const
 | 
			
		||||
{
 | 
			
		||||
	nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
	return trended_m;
 | 
			
		||||
	return std::max (cached_trended, config.online_weight_minimum.number ());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
nano::uint128_t nano::online_reps::online () const
 | 
			
		||||
{
 | 
			
		||||
	nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
	return online_m;
 | 
			
		||||
	return cached_online;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
nano::uint128_t nano::online_reps::delta () const
 | 
			
		||||
{
 | 
			
		||||
	nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
 | 
			
		||||
	// Using a larger container to ensure maximum precision
 | 
			
		||||
	auto weight = static_cast<nano::uint256_t> (std::max ({ online_m, trended_m, config.online_weight_minimum.number () }));
 | 
			
		||||
	return ((weight * online_weight_quorum) / 100).convert_to<nano::uint128_t> ();
 | 
			
		||||
	auto weight = static_cast<nano::uint256_t> (std::max ({ cached_online, cached_trended, config.online_weight_minimum.number () }));
 | 
			
		||||
	auto delta = ((weight * online_weight_quorum) / 100).convert_to<nano::uint128_t> ();
 | 
			
		||||
	release_assert (delta >= config.online_weight_minimum.number () / 100 * online_weight_quorum);
 | 
			
		||||
	return delta;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
std::vector<nano::account> nano::online_reps::list ()
 | 
			
		||||
| 
						 | 
				
			
			@ -115,14 +261,20 @@ void nano::online_reps::clear ()
 | 
			
		|||
{
 | 
			
		||||
	nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
	reps.clear ();
 | 
			
		||||
	online_m = 0;
 | 
			
		||||
	cached_online = 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::online_reps::force_online_weight (nano::uint128_t const & online_weight)
 | 
			
		||||
{
 | 
			
		||||
	release_assert (nano::is_dev_run ());
 | 
			
		||||
	nano::lock_guard<nano::mutex> lock{ mutex };
 | 
			
		||||
	online_m = online_weight;
 | 
			
		||||
	cached_online = online_weight;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::online_reps::force_sample ()
 | 
			
		||||
{
 | 
			
		||||
	release_assert (nano::is_dev_run ());
 | 
			
		||||
	sample ();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
nano::container_info nano::online_reps::container_info () const
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,6 +12,7 @@
 | 
			
		|||
#include <boost/multi_index_container.hpp>
 | 
			
		||||
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
namespace mi = boost::multi_index;
 | 
			
		||||
| 
						 | 
				
			
			@ -22,12 +23,14 @@ namespace nano
 | 
			
		|||
class online_reps final
 | 
			
		||||
{
 | 
			
		||||
public:
 | 
			
		||||
	online_reps (nano::node_config const &, nano::ledger &);
 | 
			
		||||
	online_reps (nano::node_config const &, nano::ledger &, nano::stats &, nano::logger &);
 | 
			
		||||
	~online_reps ();
 | 
			
		||||
 | 
			
		||||
	void start ();
 | 
			
		||||
	void stop ();
 | 
			
		||||
 | 
			
		||||
	/** Add voting account \p rep_account to the set of online representatives */
 | 
			
		||||
	void observe (nano::account const & rep_account);
 | 
			
		||||
	/** Called periodically to sample online weight */
 | 
			
		||||
	void sample ();
 | 
			
		||||
 | 
			
		||||
	/** Returns the trended online stake */
 | 
			
		||||
	nano::uint128_t trended () const;
 | 
			
		||||
| 
						 | 
				
			
			@ -41,16 +44,30 @@ public:
 | 
			
		|||
	nano::container_info container_info () const;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
	// TODO: This should be in the network constants
 | 
			
		||||
	static unsigned constexpr online_weight_quorum = 67;
 | 
			
		||||
 | 
			
		||||
private: // Dependencies
 | 
			
		||||
	nano::node_config const & config;
 | 
			
		||||
	nano::ledger & ledger;
 | 
			
		||||
	nano::stats & stats;
 | 
			
		||||
	nano::logger & logger;
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
	nano::uint128_t calculate_trend (nano::store::transaction &) const;
 | 
			
		||||
	void run ();
 | 
			
		||||
	/** Called periodically to sample online weight */
 | 
			
		||||
	void sample ();
 | 
			
		||||
	bool trim ();
 | 
			
		||||
	/** Remove old records from the database */
 | 
			
		||||
	void trim_trended (nano::store::write_transaction const &);
 | 
			
		||||
	/** Iterate over all database samples and remove invalid records. This is meant to clean potential leftovers from previous versions. */
 | 
			
		||||
	void sanitize_trended (nano::store::write_transaction const &);
 | 
			
		||||
 | 
			
		||||
	nano::uint128_t calculate_trended (nano::store::transaction const &) const;
 | 
			
		||||
	nano::uint128_t calculate_online () const;
 | 
			
		||||
 | 
			
		||||
	bool verify_consistency (nano::store::write_transaction const &, std::chrono::system_clock::time_point now, std::chrono::system_clock::time_point cutoff) const;
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
	struct rep_info
 | 
			
		||||
	{
 | 
			
		||||
| 
						 | 
				
			
			@ -72,12 +89,16 @@ private:
 | 
			
		|||
	// clang-format off
 | 
			
		||||
	ordered_reps reps;
 | 
			
		||||
 | 
			
		||||
	nano::uint128_t trended_m;
 | 
			
		||||
	nano::uint128_t online_m;
 | 
			
		||||
	nano::uint128_t cached_trended{0};
 | 
			
		||||
	nano::uint128_t cached_online{0};
 | 
			
		||||
 | 
			
		||||
	bool stopped{ false };
 | 
			
		||||
	nano::condition_variable condition;
 | 
			
		||||
	mutable nano::mutex mutex;
 | 
			
		||||
	std::thread thread;
 | 
			
		||||
 | 
			
		||||
public: // Only for tests
 | 
			
		||||
	void force_online_weight (nano::uint128_t const & online_weight);
 | 
			
		||||
	void force_sample ();
 | 
			
		||||
};
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -110,6 +110,7 @@ void nano::peer_history::run_one ()
 | 
			
		|||
		auto timestamp = nano::from_milliseconds_since_epoch (timestamp_millis);
 | 
			
		||||
		if (timestamp > now || timestamp < cutoff)
 | 
			
		||||
		{
 | 
			
		||||
			// TODO: Ensure it's OK to delete entry with the same key as the current iterator
 | 
			
		||||
			store.peer.del (transaction, endpoint);
 | 
			
		||||
 | 
			
		||||
			stats.inc (nano::stat::type::peer_history, nano::stat::detail::erased);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -255,8 +255,8 @@ nano::node_constants::node_constants (nano::network_constants & network_constant
 | 
			
		|||
	search_pending_interval = network_constants.is_dev_network () ? std::chrono::seconds (1) : std::chrono::seconds (5 * 60);
 | 
			
		||||
	unchecked_cleaning_interval = std::chrono::minutes (30);
 | 
			
		||||
	process_confirmed_interval = network_constants.is_dev_network () ? std::chrono::milliseconds (50) : std::chrono::milliseconds (500);
 | 
			
		||||
	max_weight_samples = (network_constants.is_live_network () || network_constants.is_test_network ()) ? 4032 : 288;
 | 
			
		||||
	weight_period = 5 * 60; // 5 minutes
 | 
			
		||||
	weight_interval = std::chrono::minutes (5);
 | 
			
		||||
	weight_cutoff = (network_constants.is_live_network () || network_constants.is_test_network ()) ? std::chrono::weeks (2) : std::chrono::days (1);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -211,9 +211,10 @@ public:
 | 
			
		|||
	std::chrono::minutes unchecked_cleaning_interval;
 | 
			
		||||
	std::chrono::milliseconds process_confirmed_interval;
 | 
			
		||||
 | 
			
		||||
	/** The maximum amount of samples for a 2 week period on live or 1 day on beta */
 | 
			
		||||
	uint64_t max_weight_samples;
 | 
			
		||||
	uint64_t weight_period;
 | 
			
		||||
	/** Time between collecting online representative samples */
 | 
			
		||||
	std::chrono::seconds weight_interval;
 | 
			
		||||
	/** The maximum time to keep online weight samples: 2 weeks on live or 1 day on beta */
 | 
			
		||||
	std::chrono::seconds weight_cutoff;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
/** Voting related constants whose value depends on the active network */
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,6 +18,7 @@ enum class writer
 | 
			
		|||
	pruning,
 | 
			
		||||
	voting_final,
 | 
			
		||||
	bounded_backlog,
 | 
			
		||||
	online_weight,
 | 
			
		||||
	testing // Used in tests to emulate a write lock
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue