Moving nano::thread_pool in to its own file

This commit is contained in:
Colin LeMahieu 2023-09-17 20:46:02 +01:00
commit 76b4861fe0
No known key found for this signature in database
GPG key ID: 43708520C8DFB938
8 changed files with 155 additions and 129 deletions

View file

@ -1,6 +1,6 @@
#include <nano/lib/optional_ptr.hpp>
#include <nano/lib/rate_limiting.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/timer.hpp>
#include <nano/lib/utility.hpp>
#include <nano/secure/utility.hpp>

View file

@ -74,6 +74,8 @@ add_library(
stats_enums.hpp
stats_enums.cpp
stream.hpp
thread_pool.hpp
thread_pool.cpp
thread_roles.hpp
thread_roles.cpp
thread_runner.hpp

97
nano/lib/thread_pool.cpp Normal file
View file

@ -0,0 +1,97 @@
#include <nano/lib/thread_pool.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/thread_pool.hpp>
/*
* thread_pool
*/
nano::thread_pool::thread_pool (unsigned num_threads, nano::thread_role::name thread_name) :
num_threads (num_threads),
thread_pool_m (std::make_unique<boost::asio::thread_pool> (num_threads)),
thread_names_latch{ num_threads }
{
set_thread_names (thread_name);
}
nano::thread_pool::~thread_pool ()
{
stop ();
}
void nano::thread_pool::stop ()
{
nano::unique_lock<nano::mutex> lk (mutex);
if (!stopped)
{
stopped = true;
#if defined(BOOST_ASIO_HAS_IOCP)
// A hack needed for Windows to prevent deadlock during destruction, described here: https://github.com/chriskohlhoff/asio/issues/431
boost::asio::use_service<boost::asio::detail::win_iocp_io_context> (*thread_pool_m).stop ();
#endif
lk.unlock ();
thread_pool_m->stop ();
thread_pool_m->join ();
lk.lock ();
thread_pool_m = nullptr;
}
}
void nano::thread_pool::push_task (std::function<void ()> task)
{
++num_tasks;
nano::lock_guard<nano::mutex> guard (mutex);
if (!stopped)
{
boost::asio::post (*thread_pool_m, [this, task] () {
task ();
--num_tasks;
});
}
}
void nano::thread_pool::add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function<void ()> task)
{
nano::lock_guard<nano::mutex> guard (mutex);
if (!stopped && thread_pool_m)
{
auto timer = std::make_shared<boost::asio::steady_timer> (thread_pool_m->get_executor (), expiry_time);
timer->async_wait ([this, task, timer] (boost::system::error_code const & ec) {
if (!ec)
{
push_task (task);
}
});
}
}
unsigned nano::thread_pool::get_num_threads () const
{
return num_threads;
}
uint64_t nano::thread_pool::num_queued_tasks () const
{
return num_tasks;
}
void nano::thread_pool::set_thread_names (nano::thread_role::name thread_name)
{
for (auto i = 0u; i < num_threads; ++i)
{
boost::asio::post (*thread_pool_m, [this, thread_name] () {
nano::thread_role::set (thread_name);
thread_names_latch.arrive_and_wait ();
});
}
thread_names_latch.wait ();
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (thread_pool & thread_pool, std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "count", thread_pool.num_queued_tasks (), sizeof (std::function<void ()>) }));
return composite;
}

53
nano/lib/thread_pool.hpp Normal file
View file

@ -0,0 +1,53 @@
#pragma once
#include <nano/lib/relaxed_atomic.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/lib/threading.hpp>
#include <atomic>
#include <chrono>
#include <functional>
#include <latch>
namespace boost::asio
{
class thread_pool;
}
namespace nano
{
class thread_pool final
{
public:
explicit thread_pool (unsigned, nano::thread_role::name);
~thread_pool ();
/** This will run when there is an available thread for execution */
void push_task (std::function<void ()>);
/** Run a task at a certain point in time */
void add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function<void ()> task);
/** Stops any further pushed tasks from executing */
void stop ();
/** Number of threads in the thread pool */
unsigned get_num_threads () const;
/** Returns the number of tasks which are awaiting execution by the thread pool **/
uint64_t num_queued_tasks () const;
private:
nano::mutex mutex;
std::atomic<bool> stopped{ false };
unsigned num_threads;
std::unique_ptr<boost::asio::thread_pool> thread_pool_m;
nano::relaxed_atomic_integral<uint64_t> num_tasks{ 0 };
/** Set the names of all the threads in the thread pool for easier identification */
std::latch thread_names_latch;
void set_thread_names (nano::thread_role::name thread_name);
};
std::unique_ptr<nano::container_info_component> collect_container_info (thread_pool & thread_pool, std::string const & name);
} // namespace nano

View file

@ -21,98 +21,6 @@ boost::thread::attributes nano::thread_attributes::get_default ()
return attrs;
}
/*
* thread_pool
*/
nano::thread_pool::thread_pool (unsigned num_threads, nano::thread_role::name thread_name) :
num_threads (num_threads),
thread_pool_m (std::make_unique<boost::asio::thread_pool> (num_threads)),
thread_names_latch{ num_threads }
{
set_thread_names (thread_name);
}
nano::thread_pool::~thread_pool ()
{
stop ();
}
void nano::thread_pool::stop ()
{
nano::unique_lock<nano::mutex> lk (mutex);
if (!stopped)
{
stopped = true;
#if defined(BOOST_ASIO_HAS_IOCP)
// A hack needed for Windows to prevent deadlock during destruction, described here: https://github.com/chriskohlhoff/asio/issues/431
boost::asio::use_service<boost::asio::detail::win_iocp_io_context> (*thread_pool_m).stop ();
#endif
lk.unlock ();
thread_pool_m->stop ();
thread_pool_m->join ();
lk.lock ();
thread_pool_m = nullptr;
}
}
void nano::thread_pool::push_task (std::function<void ()> task)
{
++num_tasks;
nano::lock_guard<nano::mutex> guard (mutex);
if (!stopped)
{
boost::asio::post (*thread_pool_m, [this, task] () {
task ();
--num_tasks;
});
}
}
void nano::thread_pool::add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function<void ()> task)
{
nano::lock_guard<nano::mutex> guard (mutex);
if (!stopped && thread_pool_m)
{
auto timer = std::make_shared<boost::asio::steady_timer> (thread_pool_m->get_executor (), expiry_time);
timer->async_wait ([this, task, timer] (boost::system::error_code const & ec) {
if (!ec)
{
push_task (task);
}
});
}
}
unsigned nano::thread_pool::get_num_threads () const
{
return num_threads;
}
uint64_t nano::thread_pool::num_queued_tasks () const
{
return num_tasks;
}
void nano::thread_pool::set_thread_names (nano::thread_role::name thread_name)
{
for (auto i = 0u; i < num_threads; ++i)
{
boost::asio::post (*thread_pool_m, [this, thread_name] () {
nano::thread_role::set (thread_name);
thread_names_latch.arrive_and_wait ();
});
}
thread_names_latch.wait ();
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (thread_pool & thread_pool, std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "count", thread_pool.num_queued_tasks (), sizeof (std::function<void ()>) }));
return composite;
}
unsigned int nano::hardware_concurrency ()
{
// Try to read overridden value from environment variable

View file

@ -21,41 +21,6 @@ namespace thread_attributes
boost::thread::attributes get_default ();
}
class thread_pool final
{
public:
explicit thread_pool (unsigned, nano::thread_role::name);
~thread_pool ();
/** This will run when there is an available thread for execution */
void push_task (std::function<void ()>);
/** Run a task at a certain point in time */
void add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function<void ()> task);
/** Stops any further pushed tasks from executing */
void stop ();
/** Number of threads in the thread pool */
unsigned get_num_threads () const;
/** Returns the number of tasks which are awaiting execution by the thread pool **/
uint64_t num_queued_tasks () const;
private:
nano::mutex mutex;
std::atomic<bool> stopped{ false };
unsigned num_threads;
std::unique_ptr<boost::asio::thread_pool> thread_pool_m;
nano::relaxed_atomic_integral<uint64_t> num_tasks{ 0 };
/** Set the names of all the threads in the thread pool for easier identification */
std::latch thread_names_latch;
void set_thread_names (nano::thread_role::name thread_name);
};
std::unique_ptr<nano::container_info_component> collect_container_info (thread_pool & thread_pool, std::string const & name);
/**
* Number of available logical processor cores. Might be overridden by setting `NANO_HARDWARE_CONCURRENCY` environment variable
*/

View file

@ -2,6 +2,7 @@
#include <nano/lib/config.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/work.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/backlog_population.hpp>

View file

@ -1,6 +1,6 @@
#pragma once
#include <nano/lib/threading.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/utility.hpp>
#include <atomic>