Small processing_queue improvements (#4159)
* Add batch type alias * Add `joinable` function * Use universal reference
This commit is contained in:
parent
81a2ab4f26
commit
9f682b49a4
1 changed files with 14 additions and 4 deletions
|
|
@ -6,6 +6,7 @@
|
||||||
#include <nano/lib/threading.hpp>
|
#include <nano/lib/threading.hpp>
|
||||||
#include <nano/lib/utility.hpp>
|
#include <nano/lib/utility.hpp>
|
||||||
|
|
||||||
|
#include <algorithm>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
@ -23,6 +24,7 @@ class processing_queue final
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
using value_t = T;
|
using value_t = T;
|
||||||
|
using batch_t = std::deque<value_t>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param thread_role Spawned processing threads will use this name
|
* @param thread_role Spawned processing threads will use this name
|
||||||
|
|
@ -71,15 +73,23 @@ public:
|
||||||
threads.clear ();
|
threads.clear ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool joinable () const
|
||||||
|
{
|
||||||
|
return std::any_of (threads.cbegin (), threads.cend (), [] (auto const & thread) {
|
||||||
|
return thread.joinable ();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queues item for batch processing
|
* Queues item for batch processing
|
||||||
*/
|
*/
|
||||||
void add (T && item)
|
template <class Item>
|
||||||
|
void add (Item && item)
|
||||||
{
|
{
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
if (queue.size () < max_queue_size)
|
if (queue.size () < max_queue_size)
|
||||||
{
|
{
|
||||||
queue.emplace_back (std::forward<T> (item));
|
queue.push_back (std::forward<T> (item));
|
||||||
lock.unlock ();
|
lock.unlock ();
|
||||||
condition.notify_one ();
|
condition.notify_one ();
|
||||||
stats.inc (stat_type, nano::stat::detail::queue);
|
stats.inc (stat_type, nano::stat::detail::queue);
|
||||||
|
|
@ -136,7 +146,7 @@ private:
|
||||||
for (int n = 0; n < max_batch_size; ++n)
|
for (int n = 0; n < max_batch_size; ++n)
|
||||||
{
|
{
|
||||||
debug_assert (!queue.empty ());
|
debug_assert (!queue.empty ());
|
||||||
queue_l.emplace_back (std::move (queue.front ()));
|
queue_l.push_back (std::move (queue.front ()));
|
||||||
queue.pop_front ();
|
queue.pop_front ();
|
||||||
}
|
}
|
||||||
return queue_l;
|
return queue_l;
|
||||||
|
|
@ -160,7 +170,7 @@ private:
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
std::function<void (std::deque<value_t> &)> process_batch{ [] (auto &) { debug_assert (false, "processing queue callback empty"); } };
|
std::function<void (batch_t &)> process_batch{ [] (auto &) { debug_assert (false, "processing queue callback empty"); } };
|
||||||
|
|
||||||
private:
|
private:
|
||||||
nano::stats & stats;
|
nano::stats & stats;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue