Merge pull request #4600 "Multithreaded vote processor" from pwojcikdev/vote-processor-threads

Multithreaded vote processor
This commit is contained in:
Piotr Wójcik 2024-05-07 09:44:31 +02:00 committed by GitHub
commit 687e5f3e76
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 83 additions and 53 deletions

View file

@ -264,6 +264,8 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue);
ASSERT_EQ (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue);
ASSERT_EQ (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority);
ASSERT_EQ (conf.node.vote_processor.threads, defaults.node.vote_processor.threads);
ASSERT_EQ (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size);
ASSERT_EQ (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_EQ (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);
@ -561,6 +563,8 @@ TEST (toml, daemon_config_deserialize_no_defaults)
max_pr_queue = 999
max_non_pr_queue = 999
pr_priority = 999
threads = 999
batch_size = 999
[node.bootstrap_server]
max_queue = 999
@ -715,6 +719,8 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue);
ASSERT_NE (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue);
ASSERT_NE (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority);
ASSERT_NE (conf.node.vote_processor.threads, defaults.node.vote_processor.threads);
ASSERT_NE (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size);
ASSERT_NE (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_NE (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);

View file

@ -188,16 +188,13 @@ public:
size_t size () const
{
return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) {
return total + queue.second.size ();
});
debug_assert (total_size == calculate_total_size ());
return total_size;
};
bool empty () const
{
return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) {
return queue.second.empty ();
});
return size () == 0;
}
size_t queues_size () const
@ -248,7 +245,12 @@ public:
release_assert (it != queues.end ());
auto & queue = it->second;
return queue.push (std::move (request)); // True if added, false if dropped
bool added = queue.push (std::move (request)); // True if added, false if dropped
if (added)
{
++total_size;
}
return added;
}
public:
@ -261,25 +263,7 @@ public:
public:
value_type next ()
{
debug_assert (!empty ()); // Should be checked before calling next
auto should_seek = [&, this] () {
if (iterator == queues.end ())
{
return true;
}
auto & queue = iterator->second;
if (queue.empty ())
{
return true;
}
// Allow up to `queue.priority` requests to be processed before moving to the next queue
if (counter >= queue.priority)
{
return true;
}
return false;
};
release_assert (!empty ()); // Should be checked before calling next
if (should_seek ())
{
@ -292,14 +276,17 @@ public:
auto & queue = iterator->second;
++counter;
--total_size;
return { queue.pop (), source };
}
std::deque<value_type> next_batch (size_t max_count)
{
// TODO: Naive implementation, could be optimized
auto const count = std::min (size (), max_count);
std::deque<value_type> result;
while (!empty () && result.size () < max_count)
while (result.size () < count)
{
result.emplace_back (next ());
}
@ -307,6 +294,25 @@ public:
}
private:
bool should_seek () const
{
if (iterator == queues.end ())
{
return true;
}
auto & queue = iterator->second;
if (queue.empty ())
{
return true;
}
// Allow up to `queue.priority` requests to be processed before moving to the next queue
if (counter >= queue.priority)
{
return true;
}
return false;
}
void seek_next ()
{
counter = 0;
@ -329,6 +335,7 @@ private:
// Invalidate the current iterator
iterator = queues.end ();
// Only removing empty queues, no need to update the `total size` counter
erase_if (queues, [] (auto const & entry) {
return entry.second.empty () && !entry.first.alive ();
});
@ -343,15 +350,24 @@ private:
}
}
size_t calculate_total_size () const
{
return std::accumulate (queues.begin (), queues.end (), size_t{ 0 }, [] (size_t total, auto const & queue) {
return total + queue.second.size ();
});
}
private:
std::map<origin_entry, entry> queues;
typename std::map<origin_entry, entry>::iterator iterator{ queues.end () };
size_t counter{ 0 };
size_t total_size{ 0 };
std::chrono::steady_clock::time_point last_update{};
public:
std::unique_ptr<container_info_component> collect_container_info (std::string const & name)
std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queues", queues_size (), sizeof (typename decltype (queues)::value_type) }));

View file

@ -60,18 +60,20 @@ nano::vote_processor::vote_processor (vote_processor_config const & config_a, na
nano::vote_processor::~vote_processor ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
debug_assert (threads.empty ());
}
void nano::vote_processor::start ()
{
debug_assert (!thread.joinable ());
debug_assert (threads.empty ());
thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::vote_processing);
run ();
} };
for (int n = 0; n < config.threads; ++n)
{
threads.emplace_back ([this] () {
nano::thread_role::set (nano::thread_role::name::vote_processing);
run ();
});
}
}
void nano::vote_processor::stop ()
@ -81,10 +83,12 @@ void nano::vote_processor::stop ()
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
for (auto & thread : threads)
{
thread.join ();
}
threads.clear ();
}
bool nano::vote_processor::vote (std::shared_ptr<nano::vote> const & vote, std::shared_ptr<nano::transport::channel> const & channel)
@ -103,7 +107,7 @@ bool nano::vote_processor::vote (std::shared_ptr<nano::vote> const & vote, std::
stats.inc (nano::stat::type::vote_processor, nano::stat::detail::process);
stats.inc (nano::stat::type::vote_processor_tier, to_stat_detail (tier));
condition.notify_all ();
condition.notify_one ();
}
else
{
@ -124,13 +128,12 @@ void nano::vote_processor::run ()
{
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();
}
condition.wait (lock, [&] {
return stopped || !queue.empty ();
});
else
{
condition.wait (lock, [&] { return stopped || !queue.empty (); });
}
}
}
@ -141,9 +144,9 @@ void nano::vote_processor::run_batch (nano::unique_lock<nano::mutex> & lock)
debug_assert (!queue.empty ());
nano::timer<std::chrono::milliseconds> timer;
timer.start ();
size_t const max_batch_size = 1024 * 4;
auto batch = queue.next_batch (max_batch_size);
auto batch = queue.next_batch (config.batch_size);
lock.unlock ();
@ -154,7 +157,7 @@ void nano::vote_processor::run_batch (nano::unique_lock<nano::mutex> & lock)
total_processed += batch.size ();
if (batch.size () == max_batch_size && timer.stop () > 100ms)
if (batch.size () == config.batch_size && timer.stop () > 100ms)
{
logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)",
batch.size (),
@ -206,13 +209,11 @@ bool nano::vote_processor::empty () const
std::unique_ptr<nano::container_info_component> nano::vote_processor::collect_container_info (std::string const & name) const
{
std::size_t votes_count;
{
nano::lock_guard<nano::mutex> guard{ mutex };
votes_count = queue.size ();
}
nano::lock_guard<nano::mutex> guard{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "votes", votes_count, sizeof (decltype (queue)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "votes", queue.size (), sizeof (decltype (queue)::value_type) }));
composite->add_component (queue.collect_container_info ("queue"));
return composite;
}
@ -225,6 +226,8 @@ nano::error nano::vote_processor_config::serialize (nano::tomlconfig & toml) con
toml.put ("max_pr_queue", max_pr_queue, "Maximum number of votes to queue from principal representatives. \ntype:uint64");
toml.put ("max_non_pr_queue", max_non_pr_queue, "Maximum number of votes to queue from non-principal representatives. \ntype:uint64");
toml.put ("pr_priority", pr_priority, "Priority for votes from principal representatives. Higher priority gets processed more frequently. Non-principal representatives have a baseline priority of 1. \ntype:uint64");
toml.put ("threads", threads, "Number of threads to use for processing votes. \ntype:uint64");
toml.put ("batch_size", batch_size, "Maximum number of votes to process in a single batch. \ntype:uint64");
return toml.get_error ();
}
@ -234,6 +237,8 @@ nano::error nano::vote_processor_config::deserialize (nano::tomlconfig & toml)
toml.get ("max_pr_queue", max_pr_queue);
toml.get ("max_non_pr_queue", max_non_pr_queue);
toml.get ("pr_priority", pr_priority);
toml.get ("threads", threads);
toml.get ("batch_size", batch_size);
return toml.get_error ();
}

View file

@ -1,6 +1,7 @@
#pragma once
#include <nano/lib/numbers.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/node/rep_tiers.hpp>
@ -48,6 +49,8 @@ public:
size_t max_pr_queue{ 256 };
size_t max_non_pr_queue{ 32 };
size_t pr_priority{ 3 };
size_t threads{ std::min (4u, nano::hardware_concurrency () / 2) };
size_t batch_size{ 1024 };
};
class vote_processor final
@ -93,6 +96,6 @@ private:
bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) };
std::thread thread;
std::vector<std::thread> threads;
};
}