From d403da7d94986ebe0a4681fcbcb00270aac8489f Mon Sep 17 00:00:00 2001 From: clemahieu Date: Thu, 21 Dec 2017 13:16:14 -0600 Subject: [PATCH] Separating work generation from wallet code. --- CMakeLists.txt | 2 + rai/node/common.cpp | 1 + rai/node/node.hpp | 1 + rai/node/wallet.cpp | 178 ---------------------------------------- rai/node/wallet.hpp | 114 ++++++++++---------------- rai/node/work.cpp | 184 ++++++++++++++++++++++++++++++++++++++++++ rai/node/work.hpp | 42 ++++++++++ rai/node/xorshift.hpp | 36 ++++----- 8 files changed, 290 insertions(+), 268 deletions(-) create mode 100644 rai/node/work.cpp create mode 100644 rai/node/work.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d84a8eb7..20dd754e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -227,6 +227,8 @@ add_library (node rai/node/testing.cpp rai/node/wallet.hpp rai/node/wallet.cpp + rai/node/work.hpp + rai/node/work.cpp rai/node/working.hpp rai/node/xorshift.hpp) diff --git a/rai/node/common.cpp b/rai/node/common.cpp index 752e104d..45051b0a 100644 --- a/rai/node/common.cpp +++ b/rai/node/common.cpp @@ -1,6 +1,7 @@ #include #include +#include std::array constexpr rai::message::magic_number; size_t constexpr rai::message::ipv4_only_position; diff --git a/rai/node/node.hpp b/rai/node/node.hpp index 1f671d21..b440db86 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include diff --git a/rai/node/wallet.cpp b/rai/node/wallet.cpp index d0181c1c..539227d7 100644 --- a/rai/node/wallet.cpp +++ b/rai/node/wallet.cpp @@ -13,184 +13,6 @@ #include -rai::work_pool::work_pool (unsigned max_threads_a, std::unique_ptr opencl_a) : -ticket (0), -done (false), -opencl (std::move (opencl_a)) -{ - static_assert (ATOMIC_INT_LOCK_FREE == 2, "Atomic int needed"); - auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::max (1u, std::min (max_threads_a, std::thread::hardware_concurrency ()))); - for (auto i (0); i < count; ++i) - { - auto thread (std::thread ([this, i] () - { - rai::work_thread_reprioritize (); - loop (i); - })); - threads.push_back (std::move (thread)); - } -} - -rai::work_pool::~work_pool () -{ - stop (); - for (auto &i: threads) - { - i.join (); - } -} - -uint64_t rai::work_pool::work_value (rai::block_hash const & root_a, uint64_t work_a) -{ - uint64_t result; - blake2b_state hash; - blake2b_init (&hash, sizeof (result)); - blake2b_update (&hash, reinterpret_cast (&work_a), sizeof (work_a)); - blake2b_update (&hash, root_a.bytes.data (), root_a.bytes.size ()); - blake2b_final (&hash, reinterpret_cast (&result), sizeof (result)); - return result; -} - -void rai::work_pool::loop (uint64_t thread) -{ - // Quick RNG for work attempts. - xorshift1024star rng; - rai::random_pool.GenerateBlock (reinterpret_cast (rng.s.data ()), rng.s.size () * sizeof (decltype (rng.s)::value_type)); - uint64_t work; - uint64_t output; - blake2b_state hash; - blake2b_init (&hash, sizeof (output)); - std::unique_lock lock (mutex); - while (!done || !pending.empty()) - { - auto empty (pending.empty ()); - if (thread == 0) - { - // Only work thread 0 notifies work observers - work_observers (!empty); - } - if (!empty) - { - auto current_l (pending.front ()); - int ticket_l (ticket); - lock.unlock (); - output = 0; - // ticket != ticket_l indicates a different thread found a solution and we should stop - while (ticket == ticket_l && output < rai::work_pool::publish_threshold) - { - // Don't query main memory every iteration in order to reduce memory bus traffic - // All operations here operate on stack memory - // Count iterations down to zero since comparing to zero is easier than comparing to another number - unsigned iteration (256); - while (iteration && output < rai::work_pool::publish_threshold) - { - work = rng.next (); - blake2b_update (&hash, reinterpret_cast (&work), sizeof (work)); - blake2b_update (&hash, current_l.first.bytes.data (), current_l.first.bytes.size ()); - blake2b_final (&hash, reinterpret_cast (&output), sizeof (output)); - blake2b_init (&hash, sizeof (output)); - iteration -= 1; - } - } - lock.lock (); - if (ticket == ticket_l) - { - // If the ticket matches what we started with, we're the ones that found the solution - assert (output >= rai::work_pool::publish_threshold); - assert (work_value (current_l.first, work) == output); - // Signal other threads to stop their work next time they check ticket - ++ticket; - current_l.second (work); - pending.pop_front (); - } - else - { - // A different thread found a solution - } - } - else - { - // Wait for a work request - producer_condition.wait (lock); - } - } -} - -void rai::work_pool::cancel (rai::uint256_union const & root_a) -{ - std::lock_guard lock (mutex); - if (!pending.empty ()) - { - if (pending.front ().first == root_a) - { - ++ticket; - } - } - pending.remove_if ([&root_a] (decltype (pending)::value_type const & item_a) - { - bool result; - if (item_a.first == root_a) - { - item_a.second (boost::none); - result = true; - } - else - { - result = false; - } - return result; - }); -} - -bool rai::work_pool::work_validate (rai::block_hash const & root_a, uint64_t work_a) -{ - auto result (work_value (root_a, work_a) < rai::work_pool::publish_threshold); - return result; -} - -bool rai::work_pool::work_validate (rai::block & block_a) -{ - return work_validate (block_a.root (), block_a.block_work ()); -} - -void rai::work_pool::stop () -{ - std::lock_guard lock (mutex); - done = true; - producer_condition.notify_all (); -} - -void rai::work_pool::generate (rai::uint256_union const & root_a, std::function const &)> callback_a) -{ - assert (!root_a.is_zero ()); - boost::optional result; - if (opencl != nullptr) - { - result = opencl->generate_work (*this, root_a); - } - if (!result) - { - std::lock_guard lock (mutex); - pending.push_back (std::make_pair (root_a, callback_a)); - producer_condition.notify_all (); - } - else - { - callback_a (result); - } -} - -uint64_t rai::work_pool::generate (rai::uint256_union const & hash_a) -{ - std::promise > work; - generate (hash_a, [&work] (boost::optional work_a) - { - work.set_value (work_a); - }); - auto result (work.get_future ().get ()); - return result.value (); -} - rai::uint256_union rai::wallet_store::check (MDB_txn * transaction_a) { rai::wallet_value value (entry_get_raw (transaction_a, rai::wallet_store::check_special)); diff --git a/rai/node/wallet.hpp b/rai/node/wallet.hpp index e7db2322..7c9bca06 100644 --- a/rai/node/wallet.hpp +++ b/rai/node/wallet.hpp @@ -1,13 +1,9 @@ #pragma once -#include - #include #include #include -#include -#include #include #include #include @@ -15,40 +11,14 @@ namespace rai { -class work_pool -{ -public: - work_pool (unsigned, std::unique_ptr ); - ~work_pool (); - void loop (uint64_t); - void stop (); - void cancel (rai::uint256_union const &); - void generate (rai::uint256_union const &, std::function const &)>); - uint64_t generate (rai::uint256_union const &); - uint64_t work_value (rai::block_hash const &, uint64_t); - bool work_validate (rai::block &); - bool work_validate (rai::block_hash const &, uint64_t); - std::atomic ticket; - bool done; - std::vector threads; - std::list const &)>>> pending; - std::mutex mutex; - std::condition_variable producer_condition; - std::unique_ptr opencl; - rai::observer_set work_observers; - // Local work threshold for rate-limiting publishing blocks. ~5 seconds of work. - static uint64_t const publish_test_threshold = 0xff00000000000000; - static uint64_t const publish_full_threshold = 0xffffffc000000000; - static uint64_t const publish_threshold = rai::rai_network == rai::rai_networks::rai_test_network ? publish_test_threshold : publish_full_threshold; -}; // The fan spreads a key out over the heap to decrease the likelyhood of it being recovered by memory inspection class fan { public: - fan (rai::uint256_union const &, size_t); - void value (rai::raw_key &); - void value_set (rai::raw_key const &); - std::vector > values; + fan (rai::uint256_union const &, size_t); + void value (rai::raw_key &); + void value_set (rai::raw_key const &); + std::vector > values; private: std::mutex mutex; void value_get (rai::raw_key &); @@ -80,15 +50,15 @@ enum class key_type class wallet_store { public: - wallet_store (bool &, rai::kdf &, rai::transaction &, rai::account, unsigned, std::string const &); - wallet_store (bool &, rai::kdf &, rai::transaction &, rai::account, unsigned, std::string const &, std::string const &); + wallet_store (bool &, rai::kdf &, rai::transaction &, rai::account, unsigned, std::string const &); + wallet_store (bool &, rai::kdf &, rai::transaction &, rai::account, unsigned, std::string const &, std::string const &); std::vector accounts (MDB_txn *); - void initialize (MDB_txn *, bool &, std::string const &); - rai::uint256_union check (MDB_txn *); - bool rekey (MDB_txn *, std::string const &); - bool valid_password (MDB_txn *); - bool attempt_password (MDB_txn *, std::string const &); - void wallet_key (rai::raw_key &, MDB_txn *); + void initialize (MDB_txn *, bool &, std::string const &); + rai::uint256_union check (MDB_txn *); + bool rekey (MDB_txn *, std::string const &); + bool valid_password (MDB_txn *); + bool attempt_password (MDB_txn *, std::string const &); + void wallet_key (rai::raw_key &, MDB_txn *); void seed (rai::raw_key &, MDB_txn *); void seed_set (MDB_txn *, rai::raw_key const &); rai::key_type key_type (rai::wallet_value const &); @@ -97,25 +67,25 @@ public: uint32_t deterministic_index_get (MDB_txn *); void deterministic_index_set (MDB_txn *, uint32_t); void deterministic_clear (MDB_txn *); - rai::uint256_union salt (MDB_txn *); - bool is_representative (MDB_txn *); - rai::account representative (MDB_txn *); - void representative_set (MDB_txn *, rai::account const &); - rai::public_key insert_adhoc (MDB_txn *, rai::raw_key const &); - void erase (MDB_txn *, rai::public_key const &); + rai::uint256_union salt (MDB_txn *); + bool is_representative (MDB_txn *); + rai::account representative (MDB_txn *); + void representative_set (MDB_txn *, rai::account const &); + rai::public_key insert_adhoc (MDB_txn *, rai::raw_key const &); + void erase (MDB_txn *, rai::public_key const &); rai::wallet_value entry_get_raw (MDB_txn *, rai::public_key const &); void entry_put_raw (MDB_txn *, rai::public_key const &, rai::wallet_value const &); - bool fetch (MDB_txn *, rai::public_key const &, rai::raw_key &); - bool exists (MDB_txn *, rai::public_key const &); + bool fetch (MDB_txn *, rai::public_key const &, rai::raw_key &); + bool exists (MDB_txn *, rai::public_key const &); void destroy (MDB_txn *); - rai::store_iterator find (MDB_txn *, rai::uint256_union const &); - rai::store_iterator begin (MDB_txn *, rai::uint256_union const &); - rai::store_iterator begin (MDB_txn *); - rai::store_iterator end (); - void derive_key (rai::raw_key &, MDB_txn *, std::string const &); - void serialize_json (MDB_txn *, std::string &); + rai::store_iterator find (MDB_txn *, rai::uint256_union const &); + rai::store_iterator begin (MDB_txn *, rai::uint256_union const &); + rai::store_iterator begin (MDB_txn *); + rai::store_iterator end (); + void derive_key (rai::raw_key &, MDB_txn *, std::string const &); + void serialize_json (MDB_txn *, std::string &); void write_backup (MDB_txn *, boost::filesystem::path const &); - bool move (MDB_txn *, rai::wallet_store &, std::vector const &); + bool move (MDB_txn *, rai::wallet_store &, std::vector const &); bool import (MDB_txn *, rai::wallet_store &); bool work_get (MDB_txn *, rai::public_key const &, uint64_t &); void work_put (MDB_txn *, rai::public_key const &, uint64_t); @@ -123,25 +93,25 @@ public: void version_put (MDB_txn *, unsigned); void upgrade_v1_v2 (); void upgrade_v2_v3 (); - rai::fan password; - static unsigned const version_1; - static unsigned const version_2; - static unsigned const version_3; - static unsigned const version_current; - static rai::uint256_union const version_special; - static rai::uint256_union const wallet_key_special; - static rai::uint256_union const salt_special; - static rai::uint256_union const check_special; - static rai::uint256_union const representative_special; + rai::fan password; + static unsigned const version_1; + static unsigned const version_2; + static unsigned const version_3; + static unsigned const version_current; + static rai::uint256_union const version_special; + static rai::uint256_union const wallet_key_special; + static rai::uint256_union const salt_special; + static rai::uint256_union const check_special; + static rai::uint256_union const representative_special; static rai::uint256_union const seed_special; static rai::uint256_union const deterministic_index_special; - static int const special_count; - static unsigned const kdf_full_work = 64 * 1024; - static unsigned const kdf_test_work = 8; - static unsigned const kdf_work = rai::rai_network == rai::rai_networks::rai_test_network ? kdf_test_work : kdf_full_work; + static int const special_count; + static unsigned const kdf_full_work = 64 * 1024; + static unsigned const kdf_test_work = 8; + static unsigned const kdf_work = rai::rai_network == rai::rai_networks::rai_test_network ? kdf_test_work : kdf_full_work; rai::kdf & kdf; rai::mdb_env & environment; - MDB_dbi handle; + MDB_dbi handle; }; class node; // A wallet is a set of account keys encrypted by a common encryption key diff --git a/rai/node/work.cpp b/rai/node/work.cpp new file mode 100644 index 00000000..1fb45064 --- /dev/null +++ b/rai/node/work.cpp @@ -0,0 +1,184 @@ +#include + +#include +#include + +#include + +rai::work_pool::work_pool (unsigned max_threads_a, std::unique_ptr opencl_a) : +ticket (0), +done (false), +opencl (std::move (opencl_a)) +{ + static_assert (ATOMIC_INT_LOCK_FREE == 2, "Atomic int needed"); + auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::max (1u, std::min (max_threads_a, std::thread::hardware_concurrency ()))); + for (auto i (0); i < count; ++i) + { + auto thread (std::thread ([this, i] () + { + rai::work_thread_reprioritize (); + loop (i); + })); + threads.push_back (std::move (thread)); + } +} + +rai::work_pool::~work_pool () +{ + stop (); + for (auto &i: threads) + { + i.join (); + } +} + +uint64_t rai::work_pool::work_value (rai::block_hash const & root_a, uint64_t work_a) +{ + uint64_t result; + blake2b_state hash; + blake2b_init (&hash, sizeof (result)); + blake2b_update (&hash, reinterpret_cast (&work_a), sizeof (work_a)); + blake2b_update (&hash, root_a.bytes.data (), root_a.bytes.size ()); + blake2b_final (&hash, reinterpret_cast (&result), sizeof (result)); + return result; +} + +void rai::work_pool::loop (uint64_t thread) +{ + // Quick RNG for work attempts. + xorshift1024star rng; + rai::random_pool.GenerateBlock (reinterpret_cast (rng.s.data ()), rng.s.size () * sizeof (decltype (rng.s)::value_type)); + uint64_t work; + uint64_t output; + blake2b_state hash; + blake2b_init (&hash, sizeof (output)); + std::unique_lock lock (mutex); + while (!done || !pending.empty()) + { + auto empty (pending.empty ()); + if (thread == 0) + { + // Only work thread 0 notifies work observers + work_observers (!empty); + } + if (!empty) + { + auto current_l (pending.front ()); + int ticket_l (ticket); + lock.unlock (); + output = 0; + // ticket != ticket_l indicates a different thread found a solution and we should stop + while (ticket == ticket_l && output < rai::work_pool::publish_threshold) + { + // Don't query main memory every iteration in order to reduce memory bus traffic + // All operations here operate on stack memory + // Count iterations down to zero since comparing to zero is easier than comparing to another number + unsigned iteration (256); + while (iteration && output < rai::work_pool::publish_threshold) + { + work = rng.next (); + blake2b_update (&hash, reinterpret_cast (&work), sizeof (work)); + blake2b_update (&hash, current_l.first.bytes.data (), current_l.first.bytes.size ()); + blake2b_final (&hash, reinterpret_cast (&output), sizeof (output)); + blake2b_init (&hash, sizeof (output)); + iteration -= 1; + } + } + lock.lock (); + if (ticket == ticket_l) + { + // If the ticket matches what we started with, we're the ones that found the solution + assert (output >= rai::work_pool::publish_threshold); + assert (work_value (current_l.first, work) == output); + // Signal other threads to stop their work next time they check ticket + ++ticket; + current_l.second (work); + pending.pop_front (); + } + else + { + // A different thread found a solution + } + } + else + { + // Wait for a work request + producer_condition.wait (lock); + } + } +} + +void rai::work_pool::cancel (rai::uint256_union const & root_a) +{ + std::lock_guard lock (mutex); + if (!pending.empty ()) + { + if (pending.front ().first == root_a) + { + ++ticket; + } + } + pending.remove_if ([&root_a] (decltype (pending)::value_type const & item_a) + { + bool result; + if (item_a.first == root_a) + { + item_a.second (boost::none); + result = true; + } + else + { + result = false; + } + return result; + }); +} + +bool rai::work_pool::work_validate (rai::block_hash const & root_a, uint64_t work_a) +{ + auto result (work_value (root_a, work_a) < rai::work_pool::publish_threshold); + return result; +} + +bool rai::work_pool::work_validate (rai::block & block_a) +{ + return work_validate (block_a.root (), block_a.block_work ()); +} + +void rai::work_pool::stop () +{ + std::lock_guard lock (mutex); + done = true; + producer_condition.notify_all (); +} + +void rai::work_pool::generate (rai::uint256_union const & root_a, std::function const &)> callback_a) +{ + assert (!root_a.is_zero ()); + boost::optional result; + if (opencl != nullptr) + { + result = opencl->generate_work (*this, root_a); + } + if (!result) + { + std::lock_guard lock (mutex); + pending.push_back (std::make_pair (root_a, callback_a)); + producer_condition.notify_all (); + } + else + { + callback_a (result); + } +} + +uint64_t rai::work_pool::generate (rai::uint256_union const & hash_a) +{ + std::promise > work; + generate (hash_a, [&work] (boost::optional work_a) + { + work.set_value (work_a); + }); + auto result (work.get_future ().get ()); + return result.value (); +} diff --git a/rai/node/work.hpp b/rai/node/work.hpp new file mode 100644 index 00000000..c6a97412 --- /dev/null +++ b/rai/node/work.hpp @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +namespace rai +{ +class block; +class opencl_work; +class work_pool +{ +public: + work_pool (unsigned, std::unique_ptr ); + ~work_pool (); + void loop (uint64_t); + void stop (); + void cancel (rai::uint256_union const &); + void generate (rai::uint256_union const &, std::function const &)>); + uint64_t generate (rai::uint256_union const &); + uint64_t work_value (rai::block_hash const &, uint64_t); + bool work_validate (rai::block &); + bool work_validate (rai::block_hash const &, uint64_t); + std::atomic ticket; + bool done; + std::vector threads; + std::list const &)>>> pending; + std::mutex mutex; + std::condition_variable producer_condition; + std::unique_ptr opencl; + rai::observer_set work_observers; + // Local work threshold for rate-limiting publishing blocks. ~5 seconds of work. + static uint64_t const publish_test_threshold = 0xff00000000000000; + static uint64_t const publish_full_threshold = 0xffffffc000000000; + static uint64_t const publish_threshold = rai::rai_network == rai::rai_networks::rai_test_network ? publish_test_threshold : publish_full_threshold; +}; +} + diff --git a/rai/node/xorshift.hpp b/rai/node/xorshift.hpp index 0beff25b..84217140 100644 --- a/rai/node/xorshift.hpp +++ b/rai/node/xorshift.hpp @@ -6,23 +6,23 @@ namespace rai class xorshift1024star { public: - xorshift1024star (): - p (0) - { - } - std::array s; - unsigned p; - uint64_t next () - { - auto p_l (p); - auto pn ((p_l + 1) & 15); - p = pn; - uint64_t s0 = s[ p_l ]; - uint64_t s1 = s[ pn ]; - s1 ^= s1 << 31; // a - s1 ^= s1 >> 11; // b - s0 ^= s0 >> 30; // c - return ( s[ pn ] = s0 ^ s1 ) * 1181783497276652981LL; - } + xorshift1024star (): + p (0) + { + } + std::array s; + unsigned p; + uint64_t next () + { + auto p_l (p); + auto pn ((p_l + 1) & 15); + p = pn; + uint64_t s0 = s[ p_l ]; + uint64_t s1 = s[ pn ]; + s1 ^= s1 << 31; // a + s1 ^= s1 >> 11; // b + s0 ^= s0 >> 30; // c + return ( s[ pn ] = s0 ^ s1 ) * 1181783497276652981LL; + } }; }