diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 204e40c13..886461df6 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -264,6 +264,8 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue); ASSERT_EQ (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue); ASSERT_EQ (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority); + ASSERT_EQ (conf.node.vote_processor.threads, defaults.node.vote_processor.threads); + ASSERT_EQ (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size); ASSERT_EQ (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue); ASSERT_EQ (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads); @@ -561,6 +563,8 @@ TEST (toml, daemon_config_deserialize_no_defaults) max_pr_queue = 999 max_non_pr_queue = 999 pr_priority = 999 + threads = 999 + batch_size = 999 [node.bootstrap_server] max_queue = 999 @@ -715,6 +719,8 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue); ASSERT_NE (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue); ASSERT_NE (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority); + ASSERT_NE (conf.node.vote_processor.threads, defaults.node.vote_processor.threads); + ASSERT_NE (conf.node.vote_processor.batch_size, defaults.node.vote_processor.batch_size); ASSERT_NE (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue); ASSERT_NE (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads); diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index 52bdcc34d..1a8561d6f 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -188,16 +188,13 @@ public: size_t size () const { - return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) { - return total + queue.second.size (); - }); + debug_assert (total_size == calculate_total_size ()); + return total_size; }; bool empty () const { - return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) { - return queue.second.empty (); - }); + return size () == 0; } size_t queues_size () const @@ -248,7 +245,12 @@ public: release_assert (it != queues.end ()); auto & queue = it->second; - return queue.push (std::move (request)); // True if added, false if dropped + bool added = queue.push (std::move (request)); // True if added, false if dropped + if (added) + { + ++total_size; + } + return added; } public: @@ -261,25 +263,7 @@ public: public: value_type next () { - debug_assert (!empty ()); // Should be checked before calling next - - auto should_seek = [&, this] () { - if (iterator == queues.end ()) - { - return true; - } - auto & queue = iterator->second; - if (queue.empty ()) - { - return true; - } - // Allow up to `queue.priority` requests to be processed before moving to the next queue - if (counter >= queue.priority) - { - return true; - } - return false; - }; + release_assert (!empty ()); // Should be checked before calling next if (should_seek ()) { @@ -292,14 +276,17 @@ public: auto & queue = iterator->second; ++counter; + --total_size; + return { queue.pop (), source }; } std::deque next_batch (size_t max_count) { - // TODO: Naive implementation, could be optimized + auto const count = std::min (size (), max_count); + std::deque result; - while (!empty () && result.size () < max_count) + while (result.size () < count) { result.emplace_back (next ()); } @@ -307,6 +294,25 @@ public: } private: + bool should_seek () const + { + if (iterator == queues.end ()) + { + return true; + } + auto & queue = iterator->second; + if (queue.empty ()) + { + return true; + } + // Allow up to `queue.priority` requests to be processed before moving to the next queue + if (counter >= queue.priority) + { + return true; + } + return false; + } + void seek_next () { counter = 0; @@ -329,6 +335,7 @@ private: // Invalidate the current iterator iterator = queues.end (); + // Only removing empty queues, no need to update the `total size` counter erase_if (queues, [] (auto const & entry) { return entry.second.empty () && !entry.first.alive (); }); @@ -343,15 +350,24 @@ private: } } + size_t calculate_total_size () const + { + return std::accumulate (queues.begin (), queues.end (), size_t{ 0 }, [] (size_t total, auto const & queue) { + return total + queue.second.size (); + }); + } + private: std::map queues; typename std::map::iterator iterator{ queues.end () }; size_t counter{ 0 }; + size_t total_size{ 0 }; + std::chrono::steady_clock::time_point last_update{}; public: - std::unique_ptr collect_container_info (std::string const & name) + std::unique_ptr collect_container_info (std::string const & name) const { auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "queues", queues_size (), sizeof (typename decltype (queues)::value_type) })); diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index 222ecd9d6..40dc22370 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -60,18 +60,20 @@ nano::vote_processor::vote_processor (vote_processor_config const & config_a, na nano::vote_processor::~vote_processor () { - // Thread must be stopped before destruction - debug_assert (!thread.joinable ()); + debug_assert (threads.empty ()); } void nano::vote_processor::start () { - debug_assert (!thread.joinable ()); + debug_assert (threads.empty ()); - thread = std::thread{ [this] () { - nano::thread_role::set (nano::thread_role::name::vote_processing); - run (); - } }; + for (int n = 0; n < config.threads; ++n) + { + threads.emplace_back ([this] () { + nano::thread_role::set (nano::thread_role::name::vote_processing); + run (); + }); + } } void nano::vote_processor::stop () @@ -81,10 +83,12 @@ void nano::vote_processor::stop () stopped = true; } condition.notify_all (); - if (thread.joinable ()) + + for (auto & thread : threads) { thread.join (); } + threads.clear (); } bool nano::vote_processor::vote (std::shared_ptr const & vote, std::shared_ptr const & channel) @@ -103,7 +107,7 @@ bool nano::vote_processor::vote (std::shared_ptr const & vote, std:: stats.inc (nano::stat::type::vote_processor, nano::stat::detail::process); stats.inc (nano::stat::type::vote_processor_tier, to_stat_detail (tier)); - condition.notify_all (); + condition.notify_one (); } else { @@ -124,13 +128,12 @@ void nano::vote_processor::run () { run_batch (lock); debug_assert (!lock.owns_lock ()); - lock.lock (); } - - condition.wait (lock, [&] { - return stopped || !queue.empty (); - }); + else + { + condition.wait (lock, [&] { return stopped || !queue.empty (); }); + } } } @@ -141,9 +144,9 @@ void nano::vote_processor::run_batch (nano::unique_lock & lock) debug_assert (!queue.empty ()); nano::timer timer; + timer.start (); - size_t const max_batch_size = 1024 * 4; - auto batch = queue.next_batch (max_batch_size); + auto batch = queue.next_batch (config.batch_size); lock.unlock (); @@ -154,7 +157,7 @@ void nano::vote_processor::run_batch (nano::unique_lock & lock) total_processed += batch.size (); - if (batch.size () == max_batch_size && timer.stop () > 100ms) + if (batch.size () == config.batch_size && timer.stop () > 100ms) { logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)", batch.size (), @@ -206,13 +209,11 @@ bool nano::vote_processor::empty () const std::unique_ptr nano::vote_processor::collect_container_info (std::string const & name) const { - std::size_t votes_count; - { - nano::lock_guard guard{ mutex }; - votes_count = queue.size (); - } + nano::lock_guard guard{ mutex }; + auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (queue)::value_type) })); + composite->add_component (std::make_unique (container_info{ "votes", queue.size (), sizeof (decltype (queue)::value_type) })); + composite->add_component (queue.collect_container_info ("queue")); return composite; } @@ -225,6 +226,8 @@ nano::error nano::vote_processor_config::serialize (nano::tomlconfig & toml) con toml.put ("max_pr_queue", max_pr_queue, "Maximum number of votes to queue from principal representatives. \ntype:uint64"); toml.put ("max_non_pr_queue", max_non_pr_queue, "Maximum number of votes to queue from non-principal representatives. \ntype:uint64"); toml.put ("pr_priority", pr_priority, "Priority for votes from principal representatives. Higher priority gets processed more frequently. Non-principal representatives have a baseline priority of 1. \ntype:uint64"); + toml.put ("threads", threads, "Number of threads to use for processing votes. \ntype:uint64"); + toml.put ("batch_size", batch_size, "Maximum number of votes to process in a single batch. \ntype:uint64"); return toml.get_error (); } @@ -234,6 +237,8 @@ nano::error nano::vote_processor_config::deserialize (nano::tomlconfig & toml) toml.get ("max_pr_queue", max_pr_queue); toml.get ("max_non_pr_queue", max_non_pr_queue); toml.get ("pr_priority", pr_priority); + toml.get ("threads", threads); + toml.get ("batch_size", batch_size); return toml.get_error (); } diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 2f2d6b4f0..43659cff0 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -48,6 +49,8 @@ public: size_t max_pr_queue{ 256 }; size_t max_non_pr_queue{ 32 }; size_t pr_priority{ 3 }; + size_t threads{ std::min (4u, nano::hardware_concurrency () / 2) }; + size_t batch_size{ 1024 }; }; class vote_processor final @@ -93,6 +96,6 @@ private: bool stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) }; - std::thread thread; + std::vector threads; }; }