Vote cache processor (#4631)

This commit is contained in:
Piotr Wójcik 2024-05-24 10:08:34 +02:00 committed by GitHub
commit 30f2de0439
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 198 additions and 42 deletions

View file

@ -48,6 +48,7 @@ enum class type
telemetry,
vote_generator,
vote_cache,
vote_cache_processor,
hinting,
blockprocessor,
blockprocessor_source,
@ -112,6 +113,7 @@ enum class detail
cache,
rebroadcast,
queue_overflow,
triggered,
// processing queue
queue,

View file

@ -28,6 +28,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::vote_processing:
thread_role_name_string = "Vote processing";
break;
case nano::thread_role::name::vote_cache_processing:
thread_role_name_string = "Vote cache proc";
break;
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;

View file

@ -14,6 +14,7 @@ enum class name
work,
message_processing,
vote_processing,
vote_cache_processing,
block_processing,
request_loop,
wallet_actions,

View file

@ -440,7 +440,7 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<
{
debug_assert (result.election);
node.vote_router.trigger_vote_cache (hash);
node.vote_cache_processor.trigger (hash);
node.observers.active_started.notify (hash);
vacancy_update ();
}
@ -523,7 +523,7 @@ bool nano::active_elections::publish (std::shared_ptr<nano::block> const & block
node.vote_router.connect (block_a->hash (), election);
lock.unlock ();
node.vote_router.trigger_vote_cache (block_a->hash ());
node.vote_cache_processor.trigger (block_a->hash ());
node.stats.inc (nano::stat::type::active, nano::stat::detail::election_block_conflict);
}

View file

@ -11,10 +11,20 @@ class ledger;
class local_vote_history;
class logger;
class network;
class network_params;
class node;
class node_config;
class node_flags;
class node_observers;
class online_reps;
class rep_crawler;
class rep_tiers;
class stats;
class vote_cache;
class vote_generator;
class vote_processor;
class vote_router;
class wallets;
enum class vote_code;
}

View file

@ -200,6 +200,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
vote_router{ *vote_router_impl },
vote_processor_impl{ std::make_unique<nano::vote_processor> (config.vote_processor, vote_router, observers, stats, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers) },
vote_processor{ *vote_processor_impl },
vote_cache_processor_impl{ std::make_unique<nano::vote_cache_processor> (config.vote_processor, vote_router, vote_cache, stats, logger) },
vote_cache_processor{ *vote_cache_processor_impl },
generator_impl{ std::make_unique<nano::vote_generator> (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false) },
generator{ *generator_impl },
final_generator_impl{ std::make_unique<nano::vote_generator> (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true) },
@ -572,6 +574,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.observers, "observers"));
composite->add_component (collect_container_info (node.wallets, "wallets"));
composite->add_component (node.vote_processor.collect_container_info ("vote_processor"));
composite->add_component (node.vote_cache_processor.collect_container_info ("vote_cache_processor"));
composite->add_component (node.rep_crawler.collect_container_info ("rep_crawler"));
composite->add_component (node.block_processor.collect_container_info ("block_processor"));
composite->add_component (collect_container_info (node.online_reps, "online_reps"));
@ -690,6 +693,7 @@ void nano::node::start ()
wallets.start ();
rep_tiers.start ();
vote_processor.start ();
vote_cache_processor.start ();
block_processor.start ();
active.start ();
generator.start ();
@ -734,6 +738,7 @@ void nano::node::stop ()
unchecked.stop ();
block_processor.stop ();
aggregator.stop ();
vote_cache_processor.stop ();
vote_processor.stop ();
rep_tiers.stop ();
scheduler.stop ();

View file

@ -48,6 +48,7 @@ class confirming_set;
class message_processor;
class node;
class vote_processor;
class vote_cache_processor;
class vote_router;
class work_pool;
class peer_history;
@ -194,6 +195,8 @@ public:
nano::vote_router & vote_router;
std::unique_ptr<nano::vote_processor> vote_processor_impl;
nano::vote_processor & vote_processor;
std::unique_ptr<nano::vote_cache_processor> vote_cache_processor_impl;
nano::vote_cache_processor & vote_cache_processor;
std::unique_ptr<nano::vote_generator> generator_impl;
nano::vote_generator & generator;
std::unique_ptr<nano::vote_generator> final_generator_impl;

View file

@ -15,6 +15,10 @@
using namespace std::chrono_literals;
/*
* vote_processor
*/
nano::vote_processor::vote_processor (vote_processor_config const & config_a, nano::vote_router & vote_router, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a, nano::rep_tiers & rep_tiers_a) :
config{ config_a },
vote_router{ vote_router },
@ -218,6 +222,129 @@ std::unique_ptr<nano::container_info_component> nano::vote_processor::collect_co
return composite;
}
/*
* vote_cache_processor
*/
nano::vote_cache_processor::vote_cache_processor (vote_processor_config const & config_a, nano::vote_router & vote_router_a, nano::vote_cache & vote_cache_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
vote_router{ vote_router_a },
vote_cache{ vote_cache_a },
stats{ stats_a },
logger{ logger_a }
{
}
nano::vote_cache_processor::~vote_cache_processor ()
{
debug_assert (!thread.joinable ());
}
void nano::vote_cache_processor::start ()
{
debug_assert (!thread.joinable ());
thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::vote_cache_processing);
run ();
});
}
void nano::vote_cache_processor::stop ()
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}
void nano::vote_cache_processor::trigger (nano::block_hash const & hash)
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (triggered.size () >= config.max_triggered)
{
triggered.pop_front ();
stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::overfill);
}
triggered.push_back (hash);
}
condition.notify_all ();
stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::triggered);
}
void nano::vote_cache_processor::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::loop);
if (!triggered.empty ())
{
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();
}
else
{
condition.wait (lock, [&] { return stopped || !triggered.empty (); });
}
}
}
void nano::vote_cache_processor::run_batch (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!triggered.empty ());
// Swap and deduplicate
decltype (triggered) triggered_l;
swap (triggered_l, triggered);
lock.unlock ();
std::unordered_set<nano::block_hash> hashes;
hashes.reserve (triggered_l.size ());
hashes.insert (triggered_l.begin (), triggered_l.end ());
stats.add (nano::stat::type::vote_cache_processor, nano::stat::detail::processed, hashes.size ());
for (auto const & hash : hashes)
{
auto cached = vote_cache.find (hash);
for (auto const & cached_vote : cached)
{
vote_router.vote (cached_vote, nano::vote_source::cache, hash);
}
}
}
std::size_t nano::vote_cache_processor::size () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return triggered.size ();
}
bool nano::vote_cache_processor::empty () const
{
return size () == 0;
}
std::unique_ptr<nano::container_info_component> nano::vote_cache_processor::collect_container_info (std::string const & name) const
{
nano::lock_guard<nano::mutex> guard{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "triggered", triggered.size (), sizeof (decltype (triggered)::value_type) }));
return composite;
}
/*
* vote_processor_config
*/
@ -242,4 +369,4 @@ nano::error nano::vote_processor_config::deserialize (nano::tomlconfig & toml)
toml.get ("batch_size", batch_size);
return toml.get_error ();
}
}

View file

@ -4,6 +4,7 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/node/fwd.hpp>
#include <nano/node/rep_tiers.hpp>
#include <nano/node/vote_router.hpp>
#include <nano/secure/common.hpp>
@ -13,32 +14,6 @@
#include <thread>
#include <unordered_set>
namespace nano
{
namespace store
{
class component;
}
class node_observers;
class stats;
class node_config;
class logger;
class online_reps;
class rep_crawler;
class ledger;
class network_params;
class node_flags;
class stats;
class rep_tiers;
enum class vote_code;
class vote_router;
namespace transport
{
class channel;
}
}
namespace nano
{
class vote_processor_config final
@ -53,21 +28,25 @@ public:
size_t pr_priority{ 3 };
size_t threads{ std::clamp (nano::hardware_concurrency () / 2, 1u, 4u) };
size_t batch_size{ 1024 };
size_t max_triggered{ 16384 };
};
class vote_processor final
{
public:
vote_processor (vote_processor_config const &, nano::vote_router & vote_router, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &);
vote_processor (vote_processor_config const &, nano::vote_router &, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &);
~vote_processor ();
void start ();
void stop ();
/** @returns true if the vote was queued for processing */
/** Queue vote for processing. @returns true if the vote was queued */
bool vote (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &, nano::vote_source = nano::vote_source::live);
nano::vote_code vote_blocking (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &, nano::vote_source = nano::vote_source::live);
/** Queue hash for vote cache lookup and processing. */
void trigger (nano::block_hash const & hash);
std::size_t size () const;
bool empty () const;
@ -101,4 +80,41 @@ private:
mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) };
std::vector<std::thread> threads;
};
class vote_cache_processor final
{
public:
vote_cache_processor (vote_processor_config const &, nano::vote_router &, nano::vote_cache &, nano::stats &, nano::logger &);
~vote_cache_processor ();
void start ();
void stop ();
/** Queue hash for vote cache lookup and processing. */
void trigger (nano::block_hash const & hash);
std::size_t size () const;
bool empty () const;
std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const;
private:
void run ();
void run_batch (nano::unique_lock<nano::mutex> &);
private: // Dependencies
vote_processor_config const & config;
nano::vote_router & vote_router;
nano::vote_cache & vote_cache;
nano::stats & stats;
nano::logger & logger;
private:
std::deque<nano::block_hash> triggered;
bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
};
}

View file

@ -123,16 +123,6 @@ std::unordered_map<nano::block_hash, nano::vote_code> nano::vote_router::vote (s
return results;
}
bool nano::vote_router::trigger_vote_cache (nano::block_hash const & hash)
{
auto cached = cache.find (hash);
for (auto const & cached_vote : cached)
{
vote (cached_vote, nano::vote_source::cache, hash);
}
return !cached.empty ();
}
bool nano::vote_router::active (nano::block_hash const & hash) const
{
std::shared_lock lock{ mutex };

View file

@ -59,7 +59,6 @@ public:
// If 'filter' parameter is non-zero, only elections for the specified hash are notified.
// This eliminates duplicate processing when triggering votes from the vote_cache as the result of a specific election being created.
std::unordered_map<nano::block_hash, nano::vote_code> vote (std::shared_ptr<nano::vote> const &, nano::vote_source = nano::vote_source::live, nano::block_hash filter = { 0 });
bool trigger_vote_cache (nano::block_hash const & hash);
bool active (nano::block_hash const & hash) const;
std::shared_ptr<nano::election> election (nano::block_hash const & hash) const;