Fix processing_queue
stop (#4082)
* Fix `processing_queue` stop * Add `start_stop_guard` helper
This commit is contained in:
parent
d6ba828257
commit
7e9f6e8db5
3 changed files with 35 additions and 8 deletions
|
@ -23,7 +23,7 @@ TEST (processing_queue, process_one)
|
|||
queue.process_batch = [&] (auto & batch) {
|
||||
processed += batch.size ();
|
||||
};
|
||||
queue.start ();
|
||||
nano::test::start_stop_guard queue_guard{ queue };
|
||||
|
||||
queue.add (1);
|
||||
|
||||
|
@ -41,7 +41,7 @@ TEST (processing_queue, process_many)
|
|||
queue.process_batch = [&] (auto & batch) {
|
||||
processed += batch.size ();
|
||||
};
|
||||
queue.start ();
|
||||
nano::test::start_stop_guard queue_guard{ queue };
|
||||
|
||||
const int count = 1024;
|
||||
for (int n = 0; n < count; ++n)
|
||||
|
@ -87,7 +87,7 @@ TEST (processing_queue, max_batch_size)
|
|||
max_batch = batch.size ();
|
||||
}
|
||||
};
|
||||
queue.start ();
|
||||
nano::test::start_stop_guard queue_guard{ queue };
|
||||
|
||||
ASSERT_TIMELY (5s, max_batch == 128);
|
||||
ASSERT_ALWAYS (1s, max_batch == 128);
|
||||
|
@ -104,7 +104,7 @@ TEST (processing_queue, parallel)
|
|||
std::this_thread::sleep_for (2s);
|
||||
processed += batch.size ();
|
||||
};
|
||||
queue.start ();
|
||||
nano::test::start_stop_guard queue_guard{ queue };
|
||||
|
||||
const int count = 16;
|
||||
for (int n = 0; n < count; ++n)
|
||||
|
|
|
@ -42,7 +42,8 @@ public:
|
|||
|
||||
~processing_queue ()
|
||||
{
|
||||
stop ();
|
||||
// Threads must be stopped before destruction
|
||||
debug_assert (threads.empty ());
|
||||
}
|
||||
|
||||
void start ()
|
||||
|
@ -50,6 +51,7 @@ public:
|
|||
for (int n = 0; n < thread_count; ++n)
|
||||
{
|
||||
threads.emplace_back ([this] () {
|
||||
nano::thread_role::set (thread_role);
|
||||
run ();
|
||||
});
|
||||
}
|
||||
|
@ -57,7 +59,10 @@ public:
|
|||
|
||||
void stop ()
|
||||
{
|
||||
stopped = true;
|
||||
{
|
||||
nano::lock_guard<nano::mutex> guard{ mutex };
|
||||
stopped = true;
|
||||
}
|
||||
condition.notify_all ();
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
|
@ -140,7 +145,6 @@ private:
|
|||
|
||||
void run ()
|
||||
{
|
||||
nano::thread_role::set (thread_role);
|
||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||
while (!stopped)
|
||||
{
|
||||
|
@ -169,7 +173,8 @@ private:
|
|||
|
||||
private:
|
||||
std::deque<value_t> queue;
|
||||
std::atomic<bool> stopped{ false };
|
||||
|
||||
bool stopped{ false };
|
||||
mutable nano::mutex mutex;
|
||||
nano::condition_variable condition;
|
||||
std::vector<std::thread> threads;
|
||||
|
|
|
@ -282,6 +282,28 @@ namespace test
|
|||
std::atomic<unsigned> required_count;
|
||||
};
|
||||
|
||||
/**
|
||||
* A helper that calls `start` from constructor and `stop` from destructor
|
||||
*/
|
||||
template <class T>
|
||||
class start_stop_guard
|
||||
{
|
||||
public:
|
||||
explicit start_stop_guard (T & ref_a) :
|
||||
ref{ ref_a }
|
||||
{
|
||||
ref.start ();
|
||||
}
|
||||
|
||||
~start_stop_guard ()
|
||||
{
|
||||
ref.stop ();
|
||||
}
|
||||
|
||||
private:
|
||||
T & ref;
|
||||
};
|
||||
|
||||
void wait_peer_connections (nano::test::system &);
|
||||
|
||||
/**
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue