From 9f682b49a4f4f7cc1e9488378e31835f870d6c51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20W=C3=B3jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Wed, 1 Mar 2023 12:28:12 +0100 Subject: [PATCH] Small `processing_queue` improvements (#4159) * Add batch type alias * Add `joinable` function * Use universal reference --- nano/lib/processing_queue.hpp | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/nano/lib/processing_queue.hpp b/nano/lib/processing_queue.hpp index c49783eac..4fa89584e 100644 --- a/nano/lib/processing_queue.hpp +++ b/nano/lib/processing_queue.hpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -23,6 +24,7 @@ class processing_queue final { public: using value_t = T; + using batch_t = std::deque; /** * @param thread_role Spawned processing threads will use this name @@ -71,15 +73,23 @@ public: threads.clear (); } + bool joinable () const + { + return std::any_of (threads.cbegin (), threads.cend (), [] (auto const & thread) { + return thread.joinable (); + }); + } + /** * Queues item for batch processing */ - void add (T && item) + template + void add (Item && item) { nano::unique_lock lock{ mutex }; if (queue.size () < max_queue_size) { - queue.emplace_back (std::forward (item)); + queue.push_back (std::forward (item)); lock.unlock (); condition.notify_one (); stats.inc (stat_type, nano::stat::detail::queue); @@ -136,7 +146,7 @@ private: for (int n = 0; n < max_batch_size; ++n) { debug_assert (!queue.empty ()); - queue_l.emplace_back (std::move (queue.front ())); + queue_l.push_back (std::move (queue.front ())); queue.pop_front (); } return queue_l; @@ -160,7 +170,7 @@ private: } public: - std::function &)> process_batch{ [] (auto &) { debug_assert (false, "processing queue callback empty"); } }; + std::function process_batch{ [] (auto &) { debug_assert (false, "processing queue callback empty"); } }; private: nano::stats & stats;