From 94e021c0bc17c3c4d379a8dba855c56b04872198 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Mon, 26 Aug 2019 15:34:53 +0100 Subject: [PATCH] Enhancements to OpenCL work handling (#2247) * Enhance OpenCL work handling OpenCL work is now properly queued along with CPU work. Any work can now be cancelled properly and the node won't wait for completion when shutting down. Work threads now help in work generation; one extra thread is added on startup to handle OpenCL (this thread is usually at 100% usage too). If using OpenCL the best is to have at most hardware_concurrency - 1 threads. * (unrelated) clarify bandwidth limit is in bytes * Was missing blake2b_init, use work_value --- nano/core_test/work_pool.cpp | 4 +- nano/lib/work.cpp | 73 ++++++++++++++++++++---------------- nano/lib/work.hpp | 4 +- nano/nano_node/daemon.cpp | 6 +-- nano/nano_node/entry.cpp | 4 +- nano/nano_wallet/entry.cpp | 4 +- nano/node/node.cpp | 2 +- nano/node/nodeconfig.hpp | 2 +- nano/node/openclwork.cpp | 9 ++++- nano/node/openclwork.hpp | 1 + 10 files changed, 62 insertions(+), 47 deletions(-) diff --git a/nano/core_test/work_pool.cpp b/nano/core_test/work_pool.cpp index 22a3bc93..0fca6206 100644 --- a/nano/core_test/work_pool.cpp +++ b/nano/core_test/work_pool.cpp @@ -78,10 +78,10 @@ TEST (work, opencl) auto opencl (nano::opencl_work::create (true, { 0, 0, 16 * 1024 }, logger)); if (opencl != nullptr) { - nano::work_pool pool (std::numeric_limits::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) { + nano::work_pool pool (std::numeric_limits::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic & ticket_a) { return opencl->generate_work (root_a, difficulty_a); } - : std::function (nano::uint256_union const &, uint64_t)> (nullptr)); + : std::function (nano::uint256_union const &, uint64_t, std::atomic & ticket_a)> (nullptr)); ASSERT_NE (nullptr, pool.opencl); nano::uint256_union root; uint64_t difficulty (0xff00000000000000); diff --git a/nano/lib/work.cpp b/nano/lib/work.cpp index 2bed951c..bac05137 100644 --- a/nano/lib/work.cpp +++ b/nano/lib/work.cpp @@ -32,7 +32,7 @@ uint64_t nano::work_value (nano::block_hash const & root_a, uint64_t work_a) return result; } -nano::work_pool::work_pool (unsigned max_threads_a, std::chrono::nanoseconds pow_rate_limiter_a, std::function (nano::uint256_union const &, uint64_t)> opencl_a) : +nano::work_pool::work_pool (unsigned max_threads_a, std::chrono::nanoseconds pow_rate_limiter_a, std::function (nano::uint256_union const &, uint64_t, std::atomic &)> opencl_a) : ticket (0), done (false), pow_rate_limiter (pow_rate_limiter_a), @@ -42,6 +42,11 @@ opencl (opencl_a) boost::thread::attributes attrs; nano::thread_attributes::set (attrs); auto count (network_constants.is_test_network () ? 1 : std::min (max_threads_a, std::max (1u, boost::thread::hardware_concurrency ()))); + if (opencl) + { + // One thread to handle OpenCL + ++count; + } for (auto i (0u); i < count; ++i) { auto thread (boost::thread (attrs, [this, i]() { @@ -87,27 +92,40 @@ void nano::work_pool::loop (uint64_t thread) int ticket_l (ticket); lock.unlock (); output = 0; - // ticket != ticket_l indicates a different thread found a solution and we should stop - while (ticket == ticket_l && output < current_l.difficulty) + boost::optional opt_work; + if (thread == 0 && opencl) { - // Don't query main memory every iteration in order to reduce memory bus traffic - // All operations here operate on stack memory - // Count iterations down to zero since comparing to zero is easier than comparing to another number - unsigned iteration (256); - while (iteration && output < current_l.difficulty) + opt_work = opencl (current_l.item, current_l.difficulty, ticket); + } + if (opt_work.is_initialized ()) + { + work = *opt_work; + output = work_value (current_l.item, work); + } + else + { + // ticket != ticket_l indicates a different thread found a solution and we should stop + while (ticket == ticket_l && output < current_l.difficulty) { - work = rng.next (); - blake2b_update (&hash, reinterpret_cast (&work), sizeof (work)); - blake2b_update (&hash, current_l.item.bytes.data (), current_l.item.bytes.size ()); - blake2b_final (&hash, reinterpret_cast (&output), sizeof (output)); - blake2b_init (&hash, sizeof (output)); - iteration -= 1; - } + // Don't query main memory every iteration in order to reduce memory bus traffic + // All operations here operate on stack memory + // Count iterations down to zero since comparing to zero is easier than comparing to another number + unsigned iteration (256); + while (iteration && output < current_l.difficulty) + { + work = rng.next (); + blake2b_update (&hash, reinterpret_cast (&work), sizeof (work)); + blake2b_update (&hash, current_l.item.bytes.data (), current_l.item.bytes.size ()); + blake2b_final (&hash, reinterpret_cast (&output), sizeof (output)); + blake2b_init (&hash, sizeof (output)); + iteration -= 1; + } - // Add a rate limiter (if specified) to the pow calculation to save some CPUs which don't want to operate at full throttle - if (pow_sleep != std::chrono::nanoseconds (0)) - { - std::this_thread::sleep_for (pow_sleep); + // Add a rate limiter (if specified) to the pow calculation to save some CPUs which don't want to operate at full throttle + if (pow_sleep != std::chrono::nanoseconds (0)) + { + std::this_thread::sleep_for (pow_sleep); + } } } lock.lock (); @@ -183,22 +201,11 @@ void nano::work_pool::generate (nano::uint256_union const & hash_a, std::functio { assert (!hash_a.is_zero ()); boost::optional result; - if (opencl) { - result = opencl (hash_a, difficulty_a); - } - if (!result) - { - { - std::lock_guard lock (mutex); - pending.push_back ({ hash_a, callback_a, difficulty_a }); - } - producer_condition.notify_all (); - } - else - { - callback_a (result); + std::lock_guard lock (mutex); + pending.push_back ({ hash_a, callback_a, difficulty_a }); } + producer_condition.notify_all (); } uint64_t nano::work_pool::generate (nano::uint256_union const & hash_a) diff --git a/nano/lib/work.hpp b/nano/lib/work.hpp index 66f5cbd3..1a8ea070 100644 --- a/nano/lib/work.hpp +++ b/nano/lib/work.hpp @@ -29,7 +29,7 @@ public: class work_pool final { public: - work_pool (unsigned, std::chrono::nanoseconds = std::chrono::nanoseconds (0), std::function (nano::uint256_union const &, uint64_t)> = nullptr); + work_pool (unsigned, std::chrono::nanoseconds = std::chrono::nanoseconds (0), std::function (nano::uint256_union const &, uint64_t, std::atomic &)> = nullptr); ~work_pool (); void loop (uint64_t); void stop (); @@ -46,7 +46,7 @@ public: std::mutex mutex; std::condition_variable producer_condition; std::chrono::nanoseconds pow_rate_limiter; - std::function (nano::uint256_union const &, uint64_t)> opencl; + std::function (nano::uint256_union const &, uint64_t, std::atomic &)> opencl; nano::observer_set work_observers; }; diff --git a/nano/nano_node/daemon.cpp b/nano/nano_node/daemon.cpp index ac4879a2..1a4e9725 100644 --- a/nano/nano_node/daemon.cpp +++ b/nano/nano_node/daemon.cpp @@ -49,10 +49,10 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano:: nano::logger_mt logger{ config.node.logging.min_time_between_log_output }; boost::asio::io_context io_ctx; auto opencl (nano::opencl_work::create (config.opencl_enable, config.opencl, logger)); - nano::work_pool opencl_work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) { - return opencl->generate_work (root_a, difficulty_a); + nano::work_pool opencl_work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic & ticket_a) { + return opencl->generate_work (root_a, difficulty_a, ticket_a); } - : std::function (nano::uint256_union const &, uint64_t)> (nullptr)); + : std::function (nano::uint256_union const &, uint64_t, std::atomic &)> (nullptr)); nano::alarm alarm (io_ctx); nano::node_init init; try diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index dc24f2d5..aa14d4d4 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -440,10 +440,10 @@ int main (int argc, char * const * argv) { nano::logger_mt logger; auto opencl (nano::opencl_work::create (true, { platform, device, threads }, logger)); - nano::work_pool work_pool (std::numeric_limits::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) { + nano::work_pool work_pool (std::numeric_limits::max (), std::chrono::nanoseconds (0), opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic &) { return opencl->generate_work (root_a, difficulty_a); } - : std::function (nano::uint256_union const &, uint64_t)> (nullptr)); + : std::function (nano::uint256_union const &, uint64_t, std::atomic &)> (nullptr)); nano::change_block block (0, 0, nano::keypair ().prv, 0, 0); std::cerr << boost::str (boost::format ("Starting OpenCL generation profiling. Platform: %1%. Device: %2%. Threads: %3%. Difficulty: %4$#x\n") % platform % device % threads % difficulty); for (uint64_t i (0); true; ++i) diff --git a/nano/nano_wallet/entry.cpp b/nano/nano_wallet/entry.cpp index c8a8f844..ae8cef4c 100644 --- a/nano/nano_wallet/entry.cpp +++ b/nano/nano_wallet/entry.cpp @@ -80,10 +80,10 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost std::shared_ptr gui; nano::set_application_icon (application); auto opencl (nano::opencl_work::create (config.opencl_enable, config.opencl, logger)); - nano::work_pool work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a) { + nano::work_pool work (config.node.work_threads, config.node.pow_sleep_interval, opencl ? [&opencl](nano::uint256_union const & root_a, uint64_t difficulty_a, std::atomic &) { return opencl->generate_work (root_a, difficulty_a); } - : std::function (nano::uint256_union const &, uint64_t)> (nullptr)); + : std::function (nano::uint256_union const &, uint64_t, std::atomic &)> (nullptr)); nano::alarm alarm (io_ctx); nano::node_init init; nano::node_flags flags; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 87ae5dce..1e508184 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -365,7 +365,7 @@ startup_time (std::chrono::steady_clock::now ()) auto network_label = network_params.network.get_current_network_as_string (); logger.always_log ("Active network: ", network_label); - logger.always_log (boost::str (boost::format ("Work pool running %1% threads") % work.threads.size ())); + logger.always_log (boost::str (boost::format ("Work pool running %1% threads %2%") % work.threads.size () % (work.opencl ? "(1 for OpenCL)" : ""))); logger.always_log (boost::str (boost::format ("%1% work peers configured") % config.work_peers.size ())); if (config.work_peers.empty () && config.work_threads == 0 && !work.opencl) { diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 9c964015..2e53becc 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -87,7 +87,7 @@ public: static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60); static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5; static std::chrono::minutes constexpr wallet_backup_interval = std::chrono::minutes (5); - size_t bandwidth_limit{ 5 * 1024 * 1024 }; // 5Mb/s + size_t bandwidth_limit{ 5 * 1024 * 1024 }; // 5MB/s std::chrono::milliseconds conf_height_processor_batch_min_time{ 50 }; bool backup_before_upgrade{ false }; std::chrono::seconds work_watcher_period{ std::chrono::seconds (5) }; diff --git a/nano/node/openclwork.cpp b/nano/node/openclwork.cpp index 6bcca364..bbf053fc 100644 --- a/nano/node/openclwork.cpp +++ b/nano/node/openclwork.cpp @@ -688,14 +688,21 @@ nano::opencl_work::~opencl_work () } boost::optional nano::opencl_work::generate_work (nano::uint256_union const & root_a, uint64_t const difficulty_a) +{ + std::atomic ticket_l{ 0 }; + return generate_work (root_a, difficulty_a, ticket_l); +} + +boost::optional nano::opencl_work::generate_work (nano::uint256_union const & root_a, uint64_t const difficulty_a, std::atomic & ticket_a) { std::lock_guard lock (mutex); bool error (false); + int ticket_l (ticket_a); uint64_t result (0); uint64_t computed_difficulty (0); unsigned thread_count (config.threads); size_t work_size[] = { thread_count, 0, 0 }; - while ((nano::work_validate (root_a, result, &computed_difficulty) || computed_difficulty < difficulty_a) && !error) + while ((nano::work_validate (root_a, result, &computed_difficulty) || computed_difficulty < difficulty_a) && !error && ticket_a == ticket_l) { result = rand.next (); cl_int write_error1 = clEnqueueWriteBuffer (queue, attempt_buffer, false, 0, sizeof (uint64_t), &result, 0, nullptr, nullptr); diff --git a/nano/node/openclwork.hpp b/nano/node/openclwork.hpp index e07b9c8d..eef0bbf2 100644 --- a/nano/node/openclwork.hpp +++ b/nano/node/openclwork.hpp @@ -44,6 +44,7 @@ public: opencl_work (bool &, nano::opencl_config const &, nano::opencl_environment &, nano::logger_mt &); ~opencl_work (); boost::optional generate_work (nano::uint256_union const &, uint64_t const); + boost::optional generate_work (nano::uint256_union const &, uint64_t const, std::atomic &); static std::unique_ptr create (bool, nano::opencl_config const &, nano::logger_mt &); nano::opencl_config const & config; std::mutex mutex;