diff --git a/nano/core_test/utility.cpp b/nano/core_test/utility.cpp index 3d418e91..652ea8e4 100644 --- a/nano/core_test/utility.cpp +++ b/nano/core_test/utility.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include diff --git a/nano/lib/CMakeLists.txt b/nano/lib/CMakeLists.txt index ad08425d..d0207325 100644 --- a/nano/lib/CMakeLists.txt +++ b/nano/lib/CMakeLists.txt @@ -74,6 +74,8 @@ add_library( stats_enums.hpp stats_enums.cpp stream.hpp + thread_pool.hpp + thread_pool.cpp thread_roles.hpp thread_roles.cpp thread_runner.hpp diff --git a/nano/lib/thread_pool.cpp b/nano/lib/thread_pool.cpp new file mode 100644 index 00000000..571a8407 --- /dev/null +++ b/nano/lib/thread_pool.cpp @@ -0,0 +1,97 @@ +#include + +#include +#include +#include + +/* + * thread_pool + */ + +nano::thread_pool::thread_pool (unsigned num_threads, nano::thread_role::name thread_name) : + num_threads (num_threads), + thread_pool_m (std::make_unique (num_threads)), + thread_names_latch{ num_threads } +{ + set_thread_names (thread_name); +} + +nano::thread_pool::~thread_pool () +{ + stop (); +} + +void nano::thread_pool::stop () +{ + nano::unique_lock lk (mutex); + if (!stopped) + { + stopped = true; +#if defined(BOOST_ASIO_HAS_IOCP) + // A hack needed for Windows to prevent deadlock during destruction, described here: https://github.com/chriskohlhoff/asio/issues/431 + boost::asio::use_service (*thread_pool_m).stop (); +#endif + lk.unlock (); + thread_pool_m->stop (); + thread_pool_m->join (); + lk.lock (); + thread_pool_m = nullptr; + } +} + +void nano::thread_pool::push_task (std::function task) +{ + ++num_tasks; + nano::lock_guard guard (mutex); + if (!stopped) + { + boost::asio::post (*thread_pool_m, [this, task] () { + task (); + --num_tasks; + }); + } +} + +void nano::thread_pool::add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function task) +{ + nano::lock_guard guard (mutex); + if (!stopped && thread_pool_m) + { + auto timer = std::make_shared (thread_pool_m->get_executor (), expiry_time); + timer->async_wait ([this, task, timer] (boost::system::error_code const & ec) { + if (!ec) + { + push_task (task); + } + }); + } +} + +unsigned nano::thread_pool::get_num_threads () const +{ + return num_threads; +} + +uint64_t nano::thread_pool::num_queued_tasks () const +{ + return num_tasks; +} + +void nano::thread_pool::set_thread_names (nano::thread_role::name thread_name) +{ + for (auto i = 0u; i < num_threads; ++i) + { + boost::asio::post (*thread_pool_m, [this, thread_name] () { + nano::thread_role::set (thread_name); + thread_names_latch.arrive_and_wait (); + }); + } + thread_names_latch.wait (); +} + +std::unique_ptr nano::collect_container_info (thread_pool & thread_pool, std::string const & name) +{ + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "count", thread_pool.num_queued_tasks (), sizeof (std::function) })); + return composite; +} diff --git a/nano/lib/thread_pool.hpp b/nano/lib/thread_pool.hpp new file mode 100644 index 00000000..f56a6271 --- /dev/null +++ b/nano/lib/thread_pool.hpp @@ -0,0 +1,53 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include + +namespace boost::asio +{ +class thread_pool; +} + +namespace nano +{ +class thread_pool final +{ +public: + explicit thread_pool (unsigned, nano::thread_role::name); + ~thread_pool (); + + /** This will run when there is an available thread for execution */ + void push_task (std::function); + + /** Run a task at a certain point in time */ + void add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function task); + + /** Stops any further pushed tasks from executing */ + void stop (); + + /** Number of threads in the thread pool */ + unsigned get_num_threads () const; + + /** Returns the number of tasks which are awaiting execution by the thread pool **/ + uint64_t num_queued_tasks () const; + +private: + nano::mutex mutex; + std::atomic stopped{ false }; + unsigned num_threads; + std::unique_ptr thread_pool_m; + nano::relaxed_atomic_integral num_tasks{ 0 }; + + /** Set the names of all the threads in the thread pool for easier identification */ + std::latch thread_names_latch; + void set_thread_names (nano::thread_role::name thread_name); +}; + +std::unique_ptr collect_container_info (thread_pool & thread_pool, std::string const & name); +} // namespace nano diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index fd3ec258..e5d00286 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -21,98 +21,6 @@ boost::thread::attributes nano::thread_attributes::get_default () return attrs; } -/* - * thread_pool - */ - -nano::thread_pool::thread_pool (unsigned num_threads, nano::thread_role::name thread_name) : - num_threads (num_threads), - thread_pool_m (std::make_unique (num_threads)), - thread_names_latch{ num_threads } -{ - set_thread_names (thread_name); -} - -nano::thread_pool::~thread_pool () -{ - stop (); -} - -void nano::thread_pool::stop () -{ - nano::unique_lock lk (mutex); - if (!stopped) - { - stopped = true; -#if defined(BOOST_ASIO_HAS_IOCP) - // A hack needed for Windows to prevent deadlock during destruction, described here: https://github.com/chriskohlhoff/asio/issues/431 - boost::asio::use_service (*thread_pool_m).stop (); -#endif - lk.unlock (); - thread_pool_m->stop (); - thread_pool_m->join (); - lk.lock (); - thread_pool_m = nullptr; - } -} - -void nano::thread_pool::push_task (std::function task) -{ - ++num_tasks; - nano::lock_guard guard (mutex); - if (!stopped) - { - boost::asio::post (*thread_pool_m, [this, task] () { - task (); - --num_tasks; - }); - } -} - -void nano::thread_pool::add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function task) -{ - nano::lock_guard guard (mutex); - if (!stopped && thread_pool_m) - { - auto timer = std::make_shared (thread_pool_m->get_executor (), expiry_time); - timer->async_wait ([this, task, timer] (boost::system::error_code const & ec) { - if (!ec) - { - push_task (task); - } - }); - } -} - -unsigned nano::thread_pool::get_num_threads () const -{ - return num_threads; -} - -uint64_t nano::thread_pool::num_queued_tasks () const -{ - return num_tasks; -} - -void nano::thread_pool::set_thread_names (nano::thread_role::name thread_name) -{ - for (auto i = 0u; i < num_threads; ++i) - { - boost::asio::post (*thread_pool_m, [this, thread_name] () { - nano::thread_role::set (thread_name); - thread_names_latch.arrive_and_wait (); - }); - } - thread_names_latch.wait (); -} - -std::unique_ptr nano::collect_container_info (thread_pool & thread_pool, std::string const & name) -{ - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "count", thread_pool.num_queued_tasks (), sizeof (std::function) })); - return composite; -} - unsigned int nano::hardware_concurrency () { // Try to read overridden value from environment variable diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index f4b8ad8f..d74bdd34 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -21,41 +21,6 @@ namespace thread_attributes boost::thread::attributes get_default (); } -class thread_pool final -{ -public: - explicit thread_pool (unsigned, nano::thread_role::name); - ~thread_pool (); - - /** This will run when there is an available thread for execution */ - void push_task (std::function); - - /** Run a task at a certain point in time */ - void add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function task); - - /** Stops any further pushed tasks from executing */ - void stop (); - - /** Number of threads in the thread pool */ - unsigned get_num_threads () const; - - /** Returns the number of tasks which are awaiting execution by the thread pool **/ - uint64_t num_queued_tasks () const; - -private: - nano::mutex mutex; - std::atomic stopped{ false }; - unsigned num_threads; - std::unique_ptr thread_pool_m; - nano::relaxed_atomic_integral num_tasks{ 0 }; - - /** Set the names of all the threads in the thread pool for easier identification */ - std::latch thread_names_latch; - void set_thread_names (nano::thread_role::name thread_name); -}; - -std::unique_ptr collect_container_info (thread_pool & thread_pool, std::string const & name); - /** * Number of available logical processor cores. Might be overridden by setting `NANO_HARDWARE_CONCURRENCY` environment variable */ diff --git a/nano/node/node.hpp b/nano/node/node.hpp index a52a04d3..e4753bb5 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include diff --git a/nano/node/signatures.hpp b/nano/node/signatures.hpp index f1aa9f90..60194870 100644 --- a/nano/node/signatures.hpp +++ b/nano/node/signatures.hpp @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include