Enhancements to OpenCL work handling (#2247)
* Enhance OpenCL work handling OpenCL work is now properly queued along with CPU work. Any work can now be cancelled properly and the node won't wait for completion when shutting down. Work threads now help in work generation; one extra thread is added on startup to handle OpenCL (this thread is usually at 100% usage too). If using OpenCL the best is to have at most hardware_concurrency - 1 threads. * (unrelated) clarify bandwidth limit is in bytes * Was missing blake2b_init, use work_value
This commit is contained in:
parent
d7c08c2d71
commit
94e021c0bc
10 changed files with 62 additions and 47 deletions
|
|
@ -78,10 +78,10 @@ TEST (work, opencl)
|
||||||
auto opencl (nano::opencl_work::create (true, { 0, 0, 16 * 1024 }, logger));
|
auto opencl (nano::opencl_work::create (true, { 0, 0, 16 * 1024 }, logger));
|
||||||
if (opencl != nullptr)
|
if (opencl != nullptr)
|
||||||
{
|
{
|
||||||
nano::work_pool pool (std::numeric_limits<unsigned>::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) {
|
nano::work_pool pool (std::numeric_limits<unsigned>::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic<int> & ticket_a) {
|
||||||
return opencl->generate_work (root_a, difficulty_a);
|
return opencl->generate_work (root_a, difficulty_a);
|
||||||
}
|
}
|
||||||
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> (nullptr));
|
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> & ticket_a)> (nullptr));
|
||||||
ASSERT_NE (nullptr, pool.opencl);
|
ASSERT_NE (nullptr, pool.opencl);
|
||||||
nano::uint256_union root;
|
nano::uint256_union root;
|
||||||
uint64_t difficulty (0xff00000000000000);
|
uint64_t difficulty (0xff00000000000000);
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ uint64_t nano::work_value (nano::block_hash const & root_a, uint64_t work_a)
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
nano::work_pool::work_pool (unsigned max_threads_a, std::chrono::nanoseconds pow_rate_limiter_a, std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> opencl_a) :
|
nano::work_pool::work_pool (unsigned max_threads_a, std::chrono::nanoseconds pow_rate_limiter_a, std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> opencl_a) :
|
||||||
ticket (0),
|
ticket (0),
|
||||||
done (false),
|
done (false),
|
||||||
pow_rate_limiter (pow_rate_limiter_a),
|
pow_rate_limiter (pow_rate_limiter_a),
|
||||||
|
|
@ -42,6 +42,11 @@ opencl (opencl_a)
|
||||||
boost::thread::attributes attrs;
|
boost::thread::attributes attrs;
|
||||||
nano::thread_attributes::set (attrs);
|
nano::thread_attributes::set (attrs);
|
||||||
auto count (network_constants.is_test_network () ? 1 : std::min (max_threads_a, std::max (1u, boost::thread::hardware_concurrency ())));
|
auto count (network_constants.is_test_network () ? 1 : std::min (max_threads_a, std::max (1u, boost::thread::hardware_concurrency ())));
|
||||||
|
if (opencl)
|
||||||
|
{
|
||||||
|
// One thread to handle OpenCL
|
||||||
|
++count;
|
||||||
|
}
|
||||||
for (auto i (0u); i < count; ++i)
|
for (auto i (0u); i < count; ++i)
|
||||||
{
|
{
|
||||||
auto thread (boost::thread (attrs, [this, i]() {
|
auto thread (boost::thread (attrs, [this, i]() {
|
||||||
|
|
@ -87,6 +92,18 @@ void nano::work_pool::loop (uint64_t thread)
|
||||||
int ticket_l (ticket);
|
int ticket_l (ticket);
|
||||||
lock.unlock ();
|
lock.unlock ();
|
||||||
output = 0;
|
output = 0;
|
||||||
|
boost::optional<uint64_t> opt_work;
|
||||||
|
if (thread == 0 && opencl)
|
||||||
|
{
|
||||||
|
opt_work = opencl (current_l.item, current_l.difficulty, ticket);
|
||||||
|
}
|
||||||
|
if (opt_work.is_initialized ())
|
||||||
|
{
|
||||||
|
work = *opt_work;
|
||||||
|
output = work_value (current_l.item, work);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
// ticket != ticket_l indicates a different thread found a solution and we should stop
|
// ticket != ticket_l indicates a different thread found a solution and we should stop
|
||||||
while (ticket == ticket_l && output < current_l.difficulty)
|
while (ticket == ticket_l && output < current_l.difficulty)
|
||||||
{
|
{
|
||||||
|
|
@ -110,6 +127,7 @@ void nano::work_pool::loop (uint64_t thread)
|
||||||
std::this_thread::sleep_for (pow_sleep);
|
std::this_thread::sleep_for (pow_sleep);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
lock.lock ();
|
lock.lock ();
|
||||||
if (ticket == ticket_l)
|
if (ticket == ticket_l)
|
||||||
{
|
{
|
||||||
|
|
@ -183,22 +201,11 @@ void nano::work_pool::generate (nano::uint256_union const & hash_a, std::functio
|
||||||
{
|
{
|
||||||
assert (!hash_a.is_zero ());
|
assert (!hash_a.is_zero ());
|
||||||
boost::optional<uint64_t> result;
|
boost::optional<uint64_t> result;
|
||||||
if (opencl)
|
|
||||||
{
|
|
||||||
result = opencl (hash_a, difficulty_a);
|
|
||||||
}
|
|
||||||
if (!result)
|
|
||||||
{
|
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock (mutex);
|
std::lock_guard<std::mutex> lock (mutex);
|
||||||
pending.push_back ({ hash_a, callback_a, difficulty_a });
|
pending.push_back ({ hash_a, callback_a, difficulty_a });
|
||||||
}
|
}
|
||||||
producer_condition.notify_all ();
|
producer_condition.notify_all ();
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
callback_a (result);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t nano::work_pool::generate (nano::uint256_union const & hash_a)
|
uint64_t nano::work_pool::generate (nano::uint256_union const & hash_a)
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ public:
|
||||||
class work_pool final
|
class work_pool final
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
work_pool (unsigned, std::chrono::nanoseconds = std::chrono::nanoseconds (0), std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> = nullptr);
|
work_pool (unsigned, std::chrono::nanoseconds = std::chrono::nanoseconds (0), std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> = nullptr);
|
||||||
~work_pool ();
|
~work_pool ();
|
||||||
void loop (uint64_t);
|
void loop (uint64_t);
|
||||||
void stop ();
|
void stop ();
|
||||||
|
|
@ -46,7 +46,7 @@ public:
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
std::condition_variable producer_condition;
|
std::condition_variable producer_condition;
|
||||||
std::chrono::nanoseconds pow_rate_limiter;
|
std::chrono::nanoseconds pow_rate_limiter;
|
||||||
std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> opencl;
|
std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> opencl;
|
||||||
nano::observer_set<bool> work_observers;
|
nano::observer_set<bool> work_observers;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -49,10 +49,10 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano::
|
||||||
nano::logger_mt logger{ config.node.logging.min_time_between_log_output };
|
nano::logger_mt logger{ config.node.logging.min_time_between_log_output };
|
||||||
boost::asio::io_context io_ctx;
|
boost::asio::io_context io_ctx;
|
||||||
auto opencl (nano::opencl_work::create (config.opencl_enable, config.opencl, logger));
|
auto opencl (nano::opencl_work::create (config.opencl_enable, config.opencl, logger));
|
||||||
nano::work_pool opencl_work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) {
|
nano::work_pool opencl_work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic<int> & ticket_a) {
|
||||||
return opencl->generate_work (root_a, difficulty_a);
|
return opencl->generate_work (root_a, difficulty_a, ticket_a);
|
||||||
}
|
}
|
||||||
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> (nullptr));
|
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> (nullptr));
|
||||||
nano::alarm alarm (io_ctx);
|
nano::alarm alarm (io_ctx);
|
||||||
nano::node_init init;
|
nano::node_init init;
|
||||||
try
|
try
|
||||||
|
|
|
||||||
|
|
@ -440,10 +440,10 @@ int main (int argc, char * const * argv)
|
||||||
{
|
{
|
||||||
nano::logger_mt logger;
|
nano::logger_mt logger;
|
||||||
auto opencl (nano::opencl_work::create (true, { platform, device, threads }, logger));
|
auto opencl (nano::opencl_work::create (true, { platform, device, threads }, logger));
|
||||||
nano::work_pool work_pool (std::numeric_limits<unsigned>::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) {
|
nano::work_pool work_pool (std::numeric_limits<unsigned>::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic<int> &) {
|
||||||
return opencl->generate_work (root_a, difficulty_a);
|
return opencl->generate_work (root_a, difficulty_a);
|
||||||
}
|
}
|
||||||
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> (nullptr));
|
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> (nullptr));
|
||||||
nano::change_block block (0, 0, nano::keypair ().prv, 0, 0);
|
nano::change_block block (0, 0, nano::keypair ().prv, 0, 0);
|
||||||
std::cerr << boost::str (boost::format ("Starting OpenCL generation profiling. Platform: %1%. Device: %2%. Threads: %3%. Difficulty: %4$#x\n") % platform % device % threads % difficulty);
|
std::cerr << boost::str (boost::format ("Starting OpenCL generation profiling. Platform: %1%. Device: %2%. Threads: %3%. Difficulty: %4$#x\n") % platform % device % threads % difficulty);
|
||||||
for (uint64_t i (0); true; ++i)
|
for (uint64_t i (0); true; ++i)
|
||||||
|
|
|
||||||
|
|
@ -80,10 +80,10 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost
|
||||||
std::shared_ptr<nano_qt::wallet> gui;
|
std::shared_ptr<nano_qt::wallet> gui;
|
||||||
nano::set_application_icon (application);
|
nano::set_application_icon (application);
|
||||||
auto opencl (nano::opencl_work::create (config.opencl_enable, config.opencl, logger));
|
auto opencl (nano::opencl_work::create (config.opencl_enable, config.opencl, logger));
|
||||||
nano::work_pool work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) {
|
nano::work_pool work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic<int> &) {
|
||||||
return opencl->generate_work (root_a, difficulty_a);
|
return opencl->generate_work (root_a, difficulty_a);
|
||||||
}
|
}
|
||||||
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t)> (nullptr));
|
: std::function<boost::optional<uint64_t> (nano::uint256_union const &, uint64_t, std::atomic<int> &)> (nullptr));
|
||||||
nano::alarm alarm (io_ctx);
|
nano::alarm alarm (io_ctx);
|
||||||
nano::node_init init;
|
nano::node_init init;
|
||||||
nano::node_flags flags;
|
nano::node_flags flags;
|
||||||
|
|
|
||||||
|
|
@ -365,7 +365,7 @@ startup_time (std::chrono::steady_clock::now ())
|
||||||
auto network_label = network_params.network.get_current_network_as_string ();
|
auto network_label = network_params.network.get_current_network_as_string ();
|
||||||
logger.always_log ("Active network: ", network_label);
|
logger.always_log ("Active network: ", network_label);
|
||||||
|
|
||||||
logger.always_log (boost::str (boost::format ("Work pool running %1% threads") % work.threads.size ()));
|
logger.always_log (boost::str (boost::format ("Work pool running %1% threads %2%") % work.threads.size () % (work.opencl ? "(1 for OpenCL)" : "")));
|
||||||
logger.always_log (boost::str (boost::format ("%1% work peers configured") % config.work_peers.size ()));
|
logger.always_log (boost::str (boost::format ("%1% work peers configured") % config.work_peers.size ()));
|
||||||
if (config.work_peers.empty () && config.work_threads == 0 && !work.opencl)
|
if (config.work_peers.empty () && config.work_threads == 0 && !work.opencl)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -87,7 +87,7 @@ public:
|
||||||
static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60);
|
static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60);
|
||||||
static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5;
|
static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5;
|
||||||
static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5);
|
static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5);
|
||||||
size_t bandwidth_limit{ 5 * 1024 * 1024 }; // 5Mb/s
|
size_t bandwidth_limit{ 5 * 1024 * 1024 }; // 5MB/s
|
||||||
std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 };
|
std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 };
|
||||||
bool backup_before_upgrade{ false };
|
bool backup_before_upgrade{ false };
|
||||||
std::chrono::seconds work_watcher_period{ std::chrono::seconds (5) };
|
std::chrono::seconds work_watcher_period{ std::chrono::seconds (5) };
|
||||||
|
|
|
||||||
|
|
@ -688,14 +688,21 @@ nano::opencl_work::~opencl_work ()
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::optional<uint64_t> nano::opencl_work::generate_work (nano::uint256_union const & root_a, uint64_t const difficulty_a)
|
boost::optional<uint64_t> nano::opencl_work::generate_work (nano::uint256_union const & root_a, uint64_t const difficulty_a)
|
||||||
|
{
|
||||||
|
std::atomic<int> ticket_l{ 0 };
|
||||||
|
return generate_work (root_a, difficulty_a, ticket_l);
|
||||||
|
}
|
||||||
|
|
||||||
|
boost::optional<uint64_t> nano::opencl_work::generate_work (nano::uint256_union const & root_a, uint64_t const difficulty_a, std::atomic<int> & ticket_a)
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock (mutex);
|
std::lock_guard<std::mutex> lock (mutex);
|
||||||
bool error (false);
|
bool error (false);
|
||||||
|
int ticket_l (ticket_a);
|
||||||
uint64_t result (0);
|
uint64_t result (0);
|
||||||
uint64_t computed_difficulty (0);
|
uint64_t computed_difficulty (0);
|
||||||
unsigned thread_count (config.threads);
|
unsigned thread_count (config.threads);
|
||||||
size_t work_size[] = { thread_count, 0, 0 };
|
size_t work_size[] = { thread_count, 0, 0 };
|
||||||
while ((nano::work_validate (root_a, result, &computed_difficulty) || computed_difficulty < difficulty_a) && !error)
|
while ((nano::work_validate (root_a, result, &computed_difficulty) || computed_difficulty < difficulty_a) && !error && ticket_a == ticket_l)
|
||||||
{
|
{
|
||||||
result = rand.next ();
|
result = rand.next ();
|
||||||
cl_int write_error1 = clEnqueueWriteBuffer (queue, attempt_buffer, false, 0, sizeof (uint64_t), &result, 0, nullptr, nullptr);
|
cl_int write_error1 = clEnqueueWriteBuffer (queue, attempt_buffer, false, 0, sizeof (uint64_t), &result, 0, nullptr, nullptr);
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ public:
|
||||||
opencl_work (bool &, nano::opencl_config const &, nano::opencl_environment &, nano::logger_mt &);
|
opencl_work (bool &, nano::opencl_config const &, nano::opencl_environment &, nano::logger_mt &);
|
||||||
~opencl_work ();
|
~opencl_work ();
|
||||||
boost::optional<uint64_t> generate_work (nano::uint256_union const &, uint64_t const);
|
boost::optional<uint64_t> generate_work (nano::uint256_union const &, uint64_t const);
|
||||||
|
boost::optional<uint64_t> generate_work (nano::uint256_union const &, uint64_t const, std::atomic<int> &);
|
||||||
static std::unique_ptr<opencl_work> create (bool, nano::opencl_config const &, nano::logger_mt &);
|
static std::unique_ptr<opencl_work> create (bool, nano::opencl_config const &, nano::logger_mt &);
|
||||||
nano::opencl_config const & config;
|
nano::opencl_config const & config;
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue