532 lines
16 KiB
C++
532 lines
16 KiB
C++
#include <nano/node/active_elections.hpp>
|
|
#include <nano/node/node.hpp>
|
|
#include <nano/node/online_reps.hpp>
|
|
#include <nano/node/repcrawler.hpp>
|
|
#include <nano/secure/ledger.hpp>
|
|
#include <nano/secure/ledger_set_confirmed.hpp>
|
|
#include <nano/secure/vote.hpp>
|
|
|
|
#include <ranges>
|
|
|
|
nano::rep_crawler::rep_crawler (nano::rep_crawler_config const & config_a, nano::node & node_a) :
|
|
config{ config_a },
|
|
node{ node_a },
|
|
stats{ node_a.stats },
|
|
logger{ node_a.logger },
|
|
network_constants{ node_a.network_params.network },
|
|
active{ node_a.active }
|
|
{
|
|
node.observers.channel_connected.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
|
|
if (!node.flags.disable_rep_crawler)
|
|
{
|
|
{
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
prioritized.push_back (channel);
|
|
}
|
|
condition.notify_all ();
|
|
}
|
|
});
|
|
}
|
|
|
|
nano::rep_crawler::~rep_crawler ()
|
|
{
|
|
// Thread must be stopped before destruction
|
|
debug_assert (!thread.joinable ());
|
|
}
|
|
|
|
void nano::rep_crawler::start ()
|
|
{
|
|
debug_assert (!thread.joinable ());
|
|
|
|
if (node.flags.disable_rep_crawler)
|
|
{
|
|
return;
|
|
}
|
|
|
|
thread = std::thread{ [this] () {
|
|
nano::thread_role::set (nano::thread_role::name::rep_crawler);
|
|
run ();
|
|
} };
|
|
}
|
|
|
|
void nano::rep_crawler::stop ()
|
|
{
|
|
{
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
stopped = true;
|
|
}
|
|
condition.notify_all ();
|
|
if (thread.joinable ())
|
|
{
|
|
thread.join ();
|
|
}
|
|
}
|
|
|
|
// Exits with the lock unlocked
|
|
void nano::rep_crawler::validate_and_process (nano::unique_lock<nano::mutex> & lock)
|
|
{
|
|
debug_assert (!mutex.try_lock ());
|
|
debug_assert (lock.owns_lock ());
|
|
debug_assert (!responses.empty ()); // Should be checked before calling this function
|
|
|
|
decltype (responses) responses_l{ responses.capacity () };
|
|
responses_l.swap (responses);
|
|
|
|
lock.unlock ();
|
|
|
|
// normally the rep_crawler only tracks principal reps but it can be made to track
|
|
// reps with less weight by setting rep_crawler_weight_minimum to a low value
|
|
auto const minimum = std::min (node.minimum_principal_weight (), node.config.rep_crawler_weight_minimum.number ());
|
|
|
|
// TODO: Is it really faster to repeatedly lock/unlock the mutex for each response?
|
|
for (auto const & response : responses_l)
|
|
{
|
|
auto & vote = response.second;
|
|
auto & channel = response.first;
|
|
release_assert (vote != nullptr);
|
|
release_assert (channel != nullptr);
|
|
|
|
nano::uint128_t const rep_weight = node.ledger.weight (vote->account);
|
|
if (rep_weight < minimum)
|
|
{
|
|
logger.debug (nano::log::type::rep_crawler, "Ignoring vote from account: {} with too little voting weight: {}",
|
|
vote->account,
|
|
rep_weight);
|
|
|
|
continue; // Skip this vote
|
|
}
|
|
|
|
// temporary data used for logging after dropping the lock
|
|
bool inserted = false;
|
|
bool updated = false;
|
|
std::shared_ptr<nano::transport::channel> prev_channel;
|
|
|
|
lock.lock ();
|
|
|
|
if (auto existing = reps.find (vote->account); existing != reps.end ())
|
|
{
|
|
reps.modify (existing, [rep_weight, &updated, &vote, &channel, &prev_channel] (rep_entry & rep) {
|
|
rep.last_response = std::chrono::steady_clock::now ();
|
|
|
|
// Update if representative channel was changed
|
|
if (rep.channel->get_remote_endpoint () != channel->get_remote_endpoint ())
|
|
{
|
|
debug_assert (rep.account == vote->account);
|
|
updated = true;
|
|
prev_channel = rep.channel;
|
|
rep.channel = channel;
|
|
}
|
|
});
|
|
}
|
|
else
|
|
{
|
|
reps.emplace (rep_entry{ vote->account, channel });
|
|
inserted = true;
|
|
}
|
|
|
|
lock.unlock ();
|
|
|
|
if (inserted)
|
|
{
|
|
logger.info (nano::log::type::rep_crawler, "Found representative: {} at: {}",
|
|
vote->account.to_account (),
|
|
channel->to_string ());
|
|
}
|
|
if (updated)
|
|
{
|
|
logger.warn (nano::log::type::rep_crawler, "Updated representative: {} at: {} (was at: {})",
|
|
vote->account.to_account (),
|
|
channel->to_string (),
|
|
prev_channel->to_string ());
|
|
}
|
|
}
|
|
}
|
|
|
|
std::chrono::milliseconds nano::rep_crawler::query_interval (bool sufficient_weight) const
|
|
{
|
|
return sufficient_weight ? network_constants.rep_crawler_normal_interval : network_constants.rep_crawler_warmup_interval;
|
|
}
|
|
|
|
bool nano::rep_crawler::query_predicate (bool sufficient_weight) const
|
|
{
|
|
return nano::elapsed (last_query, query_interval (sufficient_weight));
|
|
}
|
|
|
|
void nano::rep_crawler::run ()
|
|
{
|
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
|
while (!stopped)
|
|
{
|
|
lock.unlock ();
|
|
|
|
auto const current_total_weight = total_weight ();
|
|
bool const sufficient_weight = current_total_weight > node.online_reps.delta ();
|
|
|
|
// If online weight drops below minimum, reach out to preconfigured peers
|
|
if (!sufficient_weight)
|
|
{
|
|
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::keepalive);
|
|
node.keepalive_preconfigured ();
|
|
}
|
|
|
|
lock.lock ();
|
|
|
|
condition.wait_for (lock, query_interval (sufficient_weight), [this, sufficient_weight] {
|
|
return stopped || query_predicate (sufficient_weight) || !responses.empty () || !prioritized.empty ();
|
|
});
|
|
|
|
if (stopped)
|
|
{
|
|
return;
|
|
}
|
|
|
|
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::loop);
|
|
|
|
if (!responses.empty ())
|
|
{
|
|
validate_and_process (lock);
|
|
debug_assert (!lock.owns_lock ());
|
|
lock.lock ();
|
|
}
|
|
|
|
cleanup ();
|
|
|
|
if (!prioritized.empty ())
|
|
{
|
|
decltype (prioritized) prioritized_l;
|
|
prioritized_l.swap (prioritized);
|
|
|
|
lock.unlock ();
|
|
query (prioritized_l);
|
|
lock.lock ();
|
|
}
|
|
|
|
if (query_predicate (sufficient_weight))
|
|
{
|
|
last_query = std::chrono::steady_clock::now ();
|
|
|
|
auto targets = prepare_crawl_targets (sufficient_weight);
|
|
|
|
lock.unlock ();
|
|
query (targets);
|
|
query (node.loopback_channel); // Query local representative
|
|
lock.lock ();
|
|
}
|
|
|
|
debug_assert (lock.owns_lock ());
|
|
}
|
|
}
|
|
|
|
void nano::rep_crawler::cleanup ()
|
|
{
|
|
debug_assert (!mutex.try_lock ());
|
|
|
|
// Evict reps with dead channels
|
|
erase_if (reps, [this] (rep_entry const & rep) {
|
|
if (!rep.channel->alive ())
|
|
{
|
|
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::channel_dead);
|
|
logger.info (nano::log::type::rep_crawler, "Evicting representative: {} with dead channel at: {}", rep.account, rep.channel->to_string ());
|
|
return true; // Erase
|
|
}
|
|
return false;
|
|
});
|
|
|
|
// Evict queries that haven't been responded to in a while
|
|
erase_if (queries, [this] (query_entry const & query) {
|
|
if (nano::elapsed (query.time, config.query_timeout))
|
|
{
|
|
if (query.replies == 0)
|
|
{
|
|
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_timeout);
|
|
logger.debug (nano::log::type::rep_crawler, "Aborting unresponsive query for block: {} from: {}", query.hash, query.channel->to_string ());
|
|
}
|
|
else
|
|
{
|
|
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_completion);
|
|
logger.debug (nano::log::type::rep_crawler, "Completion of query with: {} replies for block: {} from: {}", query.replies, query.hash, query.channel->to_string ());
|
|
}
|
|
return true; // Erase
|
|
}
|
|
return false;
|
|
});
|
|
}
|
|
|
|
std::deque<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::prepare_crawl_targets (bool sufficient_weight) const
|
|
{
|
|
debug_assert (!mutex.try_lock ());
|
|
|
|
// TODO: Make these values configurable
|
|
constexpr std::size_t conservative_count = 160;
|
|
constexpr std::size_t aggressive_count = 160;
|
|
constexpr std::size_t conservative_max_attempts = 4;
|
|
constexpr std::size_t aggressive_max_attempts = 8;
|
|
std::chrono::milliseconds rep_query_interval = node.network_params.network.is_dev_network () ? std::chrono::milliseconds{ 500 } : std::chrono::milliseconds{ 60 * 1000 };
|
|
|
|
stats.inc (nano::stat::type::rep_crawler, sufficient_weight ? nano::stat::detail::crawl_normal : nano::stat::detail::crawl_aggressive);
|
|
|
|
// Crawl more aggressively if we lack sufficient total peer weight.
|
|
auto const required_peer_count = sufficient_weight ? conservative_count : aggressive_count;
|
|
|
|
auto random_peers = node.network.random_set (required_peer_count);
|
|
|
|
auto should_query = [&, this] (std::shared_ptr<nano::transport::channel> const & channel) {
|
|
if (auto rep = reps.get<tag_channel> ().find (channel); rep != reps.get<tag_channel> ().end ())
|
|
{
|
|
// Throttle queries to active reps
|
|
return elapsed (rep->last_request, rep_query_interval);
|
|
}
|
|
else
|
|
{
|
|
// Avoid querying the same peer multiple times when rep crawler is warmed up
|
|
auto const max_attempts = sufficient_weight ? conservative_max_attempts : aggressive_max_attempts;
|
|
return queries.get<tag_channel> ().count (channel) < max_attempts;
|
|
}
|
|
};
|
|
|
|
erase_if (random_peers, [&, this] (std::shared_ptr<nano::transport::channel> const & channel) {
|
|
return !should_query (channel);
|
|
});
|
|
|
|
return { random_peers.begin (), random_peers.end () };
|
|
}
|
|
|
|
auto nano::rep_crawler::prepare_query_target () const -> hash_root_t
|
|
{
|
|
constexpr int max_attempts = 32;
|
|
|
|
auto transaction = node.ledger.tx_begin_read ();
|
|
|
|
auto random_blocks = node.ledger.random_blocks (transaction, max_attempts);
|
|
for (auto const & block : random_blocks)
|
|
{
|
|
// Avoid blocks that could still have live votes coming in
|
|
if (active.recently_confirmed.contains (block->hash ()))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
// Nodes will not respond to queries for blocks that are not confirmed
|
|
if (!node.ledger.confirmed.block_exists (transaction, block->hash ()))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
return std::make_pair (block->hash (), block->root ());
|
|
}
|
|
|
|
// If no suitable block was found, query genesis
|
|
return std::make_pair (node.network_params.ledger.genesis->hash (), node.network_params.ledger.genesis->root ());
|
|
}
|
|
|
|
bool nano::rep_crawler::track_rep_request (hash_root_t hash_root, std::shared_ptr<nano::transport::channel> const & channel)
|
|
{
|
|
debug_assert (!mutex.try_lock ());
|
|
|
|
auto [_, inserted] = queries.emplace (query_entry{ hash_root.first, channel });
|
|
if (!inserted)
|
|
{
|
|
return false; // Duplicate, not tracked
|
|
}
|
|
|
|
// Find and update the timestamp on all reps available on the endpoint (a single host may have multiple reps)
|
|
auto & index = reps.get<tag_channel> ();
|
|
auto [begin, end] = index.equal_range (channel);
|
|
for (auto it = begin; it != end; ++it)
|
|
{
|
|
index.modify (it, [] (rep_entry & info) {
|
|
info.last_request = std::chrono::steady_clock::now ();
|
|
});
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void nano::rep_crawler::query (std::deque<std::shared_ptr<nano::transport::channel>> const & target_channels)
|
|
{
|
|
auto hash_root = prepare_query_target ();
|
|
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
|
|
for (const auto & channel : target_channels)
|
|
{
|
|
bool tracked = track_rep_request (hash_root, channel);
|
|
if (tracked)
|
|
{
|
|
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_sent);
|
|
logger.debug (nano::log::type::rep_crawler, "Sending query for block: {} to: {}", hash_root.first, channel->to_string ());
|
|
|
|
auto const & [hash, root] = hash_root;
|
|
nano::confirm_req req{ network_constants, hash, root };
|
|
|
|
channel->send (req, nano::transport::traffic_type::rep_crawler, [this] (auto & ec, auto size) {
|
|
stats.inc (nano::stat::type::rep_crawler_ec, to_stat_detail (ec), nano::stat::dir::out);
|
|
});
|
|
}
|
|
else
|
|
{
|
|
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::query_duplicate);
|
|
logger.debug (nano::log::type::rep_crawler, "Ignoring duplicate query for block: {} to: {}", hash_root.first, channel->to_string ());
|
|
}
|
|
}
|
|
}
|
|
|
|
void nano::rep_crawler::query (std::shared_ptr<nano::transport::channel> const & target_channel)
|
|
{
|
|
query (std::deque{ target_channel });
|
|
}
|
|
|
|
bool nano::rep_crawler::is_pr (std::shared_ptr<nano::transport::channel> const & channel) const
|
|
{
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
auto existing = reps.get<tag_channel> ().find (channel);
|
|
if (existing != reps.get<tag_channel> ().end ())
|
|
{
|
|
return node.ledger.weight (existing->account) >= node.minimum_principal_weight ();
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool nano::rep_crawler::process (std::shared_ptr<nano::vote> const & vote, std::shared_ptr<nano::transport::channel> const & channel)
|
|
{
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
|
|
auto [begin, end] = queries.get<tag_channel> ().equal_range (channel);
|
|
for (auto it = begin; it != end; ++it)
|
|
{
|
|
// TODO: This linear search could be slow, especially with large votes.
|
|
auto const target_hash = it->hash;
|
|
bool found = std::any_of (vote->hashes.begin (), vote->hashes.end (), [&target_hash] (nano::block_hash const & hash) {
|
|
return hash == target_hash;
|
|
});
|
|
if (found)
|
|
{
|
|
stats.inc (nano::stat::type::rep_crawler, nano::stat::detail::response);
|
|
logger.debug (nano::log::type::rep_crawler, "Processing response for block: {} from: {}", target_hash, channel->to_string ());
|
|
|
|
// Track response time
|
|
stats.sample (nano::stat::sample::rep_response_time, nano::log::milliseconds_delta (it->time), { 0, config.query_timeout.count () });
|
|
|
|
responses.push_back ({ channel, vote });
|
|
queries.modify (it, [] (query_entry & e) {
|
|
e.replies++;
|
|
});
|
|
condition.notify_all ();
|
|
return true; // Found and processed
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
nano::uint128_t nano::rep_crawler::total_weight () const
|
|
{
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
nano::uint128_t result = 0;
|
|
for (const auto & rep : reps)
|
|
{
|
|
if (rep.channel->alive ())
|
|
{
|
|
result += node.ledger.weight (rep.account);
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
std::vector<nano::representative> nano::rep_crawler::representatives (std::size_t count, nano::uint128_t const minimum_weight, std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version) const
|
|
{
|
|
auto const version_min = minimum_protocol_version.value_or (node.network_params.network.protocol_version_min);
|
|
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
|
|
std::multimap<nano::amount, rep_entry, std::greater<>> ordered;
|
|
for (const auto & rep : reps.get<tag_account> ())
|
|
{
|
|
auto weight = node.ledger.weight (rep.account);
|
|
if (weight >= minimum_weight && rep.channel->get_network_version () >= version_min)
|
|
{
|
|
ordered.insert ({ nano::amount{ weight }, rep });
|
|
}
|
|
}
|
|
|
|
std::vector<nano::representative> result;
|
|
result.reserve (ordered.size ());
|
|
for (auto i = ordered.begin (), n = ordered.end (); i != n && result.size () < count; ++i)
|
|
{
|
|
auto const & [weight, rep] = *i;
|
|
result.push_back ({ rep.account, rep.channel });
|
|
}
|
|
return result;
|
|
}
|
|
|
|
std::vector<nano::representative> nano::rep_crawler::principal_representatives (std::size_t count, std::optional<decltype (nano::network_constants::protocol_version)> const & minimum_protocol_version) const
|
|
{
|
|
return representatives (count, node.minimum_principal_weight (), minimum_protocol_version);
|
|
}
|
|
|
|
std::size_t nano::rep_crawler::representative_count () const
|
|
{
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
return reps.size ();
|
|
}
|
|
|
|
// Only for tests
|
|
void nano::rep_crawler::force_add_rep (const nano::account & account, const std::shared_ptr<nano::transport::channel> & channel)
|
|
{
|
|
release_assert (node.network_params.network.is_dev_network ());
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
reps.emplace (rep_entry{ account, channel });
|
|
}
|
|
|
|
// Only for tests
|
|
void nano::rep_crawler::force_process (const std::shared_ptr<nano::vote> & vote, const std::shared_ptr<nano::transport::channel> & channel)
|
|
{
|
|
release_assert (node.network_params.network.is_dev_network ());
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
responses.push_back ({ channel, vote });
|
|
}
|
|
|
|
// Only for tests
|
|
void nano::rep_crawler::force_query (const nano::block_hash & hash, const std::shared_ptr<nano::transport::channel> & channel)
|
|
{
|
|
release_assert (node.network_params.network.is_dev_network ());
|
|
nano::lock_guard<nano::mutex> lock{ mutex };
|
|
queries.emplace (query_entry{ hash, channel });
|
|
}
|
|
|
|
nano::container_info nano::rep_crawler::container_info () const
|
|
{
|
|
nano::lock_guard<nano::mutex> guard{ mutex };
|
|
|
|
nano::container_info info;
|
|
info.put ("reps", reps);
|
|
info.put ("queries", queries);
|
|
info.put ("responses", responses);
|
|
info.put ("prioritized", prioritized);
|
|
return info;
|
|
}
|
|
|
|
/*
|
|
* rep_crawler_config
|
|
*/
|
|
|
|
nano::rep_crawler_config::rep_crawler_config (nano::network_constants const & network_constants)
|
|
{
|
|
if (network_constants.is_dev_network ())
|
|
{
|
|
query_timeout = std::chrono::milliseconds{ 1000 };
|
|
}
|
|
}
|
|
|
|
nano::error nano::rep_crawler_config::serialize (nano::tomlconfig & toml) const
|
|
{
|
|
toml.put ("query_timeout", query_timeout.count (), "Timeout for rep crawler queries.\ntype:milliseconds");
|
|
|
|
return toml.get_error ();
|
|
}
|
|
|
|
nano::error nano::rep_crawler_config::deserialize (nano::tomlconfig & toml)
|
|
{
|
|
toml.get_duration ("query_timeout", query_timeout);
|
|
|
|
return toml.get_error ();
|
|
}
|