From 7e9f6e8db54acbef61330355df860a5f9855f3db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20W=C3=B3jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 30 Jan 2023 21:12:02 +0100 Subject: [PATCH] Fix `processing_queue` stop (#4082) * Fix `processing_queue` stop * Add `start_stop_guard` helper --- nano/core_test/processing_queue.cpp | 8 ++++---- nano/lib/processing_queue.hpp | 13 +++++++++---- nano/test_common/testutil.hpp | 22 ++++++++++++++++++++++ 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/nano/core_test/processing_queue.cpp b/nano/core_test/processing_queue.cpp index 85c54a9f..04525218 100644 --- a/nano/core_test/processing_queue.cpp +++ b/nano/core_test/processing_queue.cpp @@ -23,7 +23,7 @@ TEST (processing_queue, process_one) queue.process_batch = [&] (auto & batch) { processed += batch.size (); }; - queue.start (); + nano::test::start_stop_guard queue_guard{ queue }; queue.add (1); @@ -41,7 +41,7 @@ TEST (processing_queue, process_many) queue.process_batch = [&] (auto & batch) { processed += batch.size (); }; - queue.start (); + nano::test::start_stop_guard queue_guard{ queue }; const int count = 1024; for (int n = 0; n < count; ++n) @@ -87,7 +87,7 @@ TEST (processing_queue, max_batch_size) max_batch = batch.size (); } }; - queue.start (); + nano::test::start_stop_guard queue_guard{ queue }; ASSERT_TIMELY (5s, max_batch == 128); ASSERT_ALWAYS (1s, max_batch == 128); @@ -104,7 +104,7 @@ TEST (processing_queue, parallel) std::this_thread::sleep_for (2s); processed += batch.size (); }; - queue.start (); + nano::test::start_stop_guard queue_guard{ queue }; const int count = 16; for (int n = 0; n < count; ++n) diff --git a/nano/lib/processing_queue.hpp b/nano/lib/processing_queue.hpp index 380b20be..c49783ea 100644 --- a/nano/lib/processing_queue.hpp +++ b/nano/lib/processing_queue.hpp @@ -42,7 +42,8 @@ public: ~processing_queue () { - stop (); + // Threads must be stopped before destruction + debug_assert (threads.empty ()); } void start () @@ -50,6 +51,7 @@ public: for (int n = 0; n < thread_count; ++n) { threads.emplace_back ([this] () { + nano::thread_role::set (thread_role); run (); }); } @@ -57,7 +59,10 @@ public: void stop () { - stopped = true; + { + nano::lock_guard guard{ mutex }; + stopped = true; + } condition.notify_all (); for (auto & thread : threads) { @@ -140,7 +145,6 @@ private: void run () { - nano::thread_role::set (thread_role); nano::unique_lock lock{ mutex }; while (!stopped) { @@ -169,7 +173,8 @@ private: private: std::deque queue; - std::atomic stopped{ false }; + + bool stopped{ false }; mutable nano::mutex mutex; nano::condition_variable condition; std::vector threads; diff --git a/nano/test_common/testutil.hpp b/nano/test_common/testutil.hpp index 5199348b..db952371 100644 --- a/nano/test_common/testutil.hpp +++ b/nano/test_common/testutil.hpp @@ -282,6 +282,28 @@ namespace test std::atomic required_count; }; + /** + * A helper that calls `start` from constructor and `stop` from destructor + */ + template + class start_stop_guard + { + public: + explicit start_stop_guard (T & ref_a) : + ref{ ref_a } + { + ref.start (); + } + + ~start_stop_guard () + { + ref.stop (); + } + + private: + T & ref; + }; + void wait_peer_connections (nano::test::system &); /**