Separating work generation from wallet code.

This commit is contained in:
clemahieu 2017-12-21 13:16:14 -06:00
commit d403da7d94
8 changed files with 290 additions and 268 deletions

View file

@ -227,6 +227,8 @@ add_library (node
rai/node/testing.cpp
rai/node/wallet.hpp
rai/node/wallet.cpp
rai/node/work.hpp
rai/node/work.cpp
rai/node/working.hpp
rai/node/xorshift.hpp)

View file

@ -1,6 +1,7 @@
#include <rai/node/common.hpp>
#include <rai/node/wallet.hpp>
#include <rai/node/work.hpp>
std::array <uint8_t, 2> constexpr rai::message::magic_number;
size_t constexpr rai::message::ipv4_only_position;

View file

@ -2,6 +2,7 @@
#include <rai/node/bootstrap.hpp>
#include <rai/node/wallet.hpp>
#include <rai/node/work.hpp>
#include <unordered_set>
#include <memory>

View file

@ -13,184 +13,6 @@
#include <ed25519-donna/ed25519.h>
rai::work_pool::work_pool (unsigned max_threads_a, std::unique_ptr <rai::opencl_work> opencl_a) :
ticket (0),
done (false),
opencl (std::move (opencl_a))
{
static_assert (ATOMIC_INT_LOCK_FREE == 2, "Atomic int needed");
auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::max (1u, std::min (max_threads_a, std::thread::hardware_concurrency ())));
for (auto i (0); i < count; ++i)
{
auto thread (std::thread ([this, i] ()
{
rai::work_thread_reprioritize ();
loop (i);
}));
threads.push_back (std::move (thread));
}
}
rai::work_pool::~work_pool ()
{
stop ();
for (auto &i: threads)
{
i.join ();
}
}
uint64_t rai::work_pool::work_value (rai::block_hash const & root_a, uint64_t work_a)
{
uint64_t result;
blake2b_state hash;
blake2b_init (&hash, sizeof (result));
blake2b_update (&hash, reinterpret_cast <uint8_t *> (&work_a), sizeof (work_a));
blake2b_update (&hash, root_a.bytes.data (), root_a.bytes.size ());
blake2b_final (&hash, reinterpret_cast <uint8_t *> (&result), sizeof (result));
return result;
}
void rai::work_pool::loop (uint64_t thread)
{
// Quick RNG for work attempts.
xorshift1024star rng;
rai::random_pool.GenerateBlock (reinterpret_cast <uint8_t *> (rng.s.data ()), rng.s.size () * sizeof (decltype (rng.s)::value_type));
uint64_t work;
uint64_t output;
blake2b_state hash;
blake2b_init (&hash, sizeof (output));
std::unique_lock <std::mutex> lock (mutex);
while (!done || !pending.empty())
{
auto empty (pending.empty ());
if (thread == 0)
{
// Only work thread 0 notifies work observers
work_observers (!empty);
}
if (!empty)
{
auto current_l (pending.front ());
int ticket_l (ticket);
lock.unlock ();
output = 0;
// ticket != ticket_l indicates a different thread found a solution and we should stop
while (ticket == ticket_l && output < rai::work_pool::publish_threshold)
{
// Don't query main memory every iteration in order to reduce memory bus traffic
// All operations here operate on stack memory
// Count iterations down to zero since comparing to zero is easier than comparing to another number
unsigned iteration (256);
while (iteration && output < rai::work_pool::publish_threshold)
{
work = rng.next ();
blake2b_update (&hash, reinterpret_cast <uint8_t *> (&work), sizeof (work));
blake2b_update (&hash, current_l.first.bytes.data (), current_l.first.bytes.size ());
blake2b_final (&hash, reinterpret_cast <uint8_t *> (&output), sizeof (output));
blake2b_init (&hash, sizeof (output));
iteration -= 1;
}
}
lock.lock ();
if (ticket == ticket_l)
{
// If the ticket matches what we started with, we're the ones that found the solution
assert (output >= rai::work_pool::publish_threshold);
assert (work_value (current_l.first, work) == output);
// Signal other threads to stop their work next time they check ticket
++ticket;
current_l.second (work);
pending.pop_front ();
}
else
{
// A different thread found a solution
}
}
else
{
// Wait for a work request
producer_condition.wait (lock);
}
}
}
void rai::work_pool::cancel (rai::uint256_union const & root_a)
{
std::lock_guard <std::mutex> lock (mutex);
if (!pending.empty ())
{
if (pending.front ().first == root_a)
{
++ticket;
}
}
pending.remove_if ([&root_a] (decltype (pending)::value_type const & item_a)
{
bool result;
if (item_a.first == root_a)
{
item_a.second (boost::none);
result = true;
}
else
{
result = false;
}
return result;
});
}
bool rai::work_pool::work_validate (rai::block_hash const & root_a, uint64_t work_a)
{
auto result (work_value (root_a, work_a) < rai::work_pool::publish_threshold);
return result;
}
bool rai::work_pool::work_validate (rai::block & block_a)
{
return work_validate (block_a.root (), block_a.block_work ());
}
void rai::work_pool::stop ()
{
std::lock_guard <std::mutex> lock (mutex);
done = true;
producer_condition.notify_all ();
}
void rai::work_pool::generate (rai::uint256_union const & root_a, std::function <void (boost::optional <uint64_t> const &)> callback_a)
{
assert (!root_a.is_zero ());
boost::optional <uint64_t> result;
if (opencl != nullptr)
{
result = opencl->generate_work (*this, root_a);
}
if (!result)
{
std::lock_guard <std::mutex> lock (mutex);
pending.push_back (std::make_pair (root_a, callback_a));
producer_condition.notify_all ();
}
else
{
callback_a (result);
}
}
uint64_t rai::work_pool::generate (rai::uint256_union const & hash_a)
{
std::promise <boost::optional <uint64_t>> work;
generate (hash_a, [&work] (boost::optional <uint64_t> work_a)
{
work.set_value (work_a);
});
auto result (work.get_future ().get ());
return result.value ();
}
rai::uint256_union rai::wallet_store::check (MDB_txn * transaction_a)
{
rai::wallet_value value (entry_get_raw (transaction_a, rai::wallet_store::check_special));

View file

@ -1,13 +1,9 @@
#pragma once
#include <boost/optional.hpp>
#include <rai/secure.hpp>
#include <rai/node/common.hpp>
#include <rai/node/openclwork.hpp>
#include <atomic>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
@ -15,40 +11,14 @@
namespace rai
{
class work_pool
{
public:
work_pool (unsigned, std::unique_ptr <rai::opencl_work>);
~work_pool ();
void loop (uint64_t);
void stop ();
void cancel (rai::uint256_union const &);
void generate (rai::uint256_union const &, std::function <void (boost::optional <uint64_t> const &)>);
uint64_t generate (rai::uint256_union const &);
uint64_t work_value (rai::block_hash const &, uint64_t);
bool work_validate (rai::block &);
bool work_validate (rai::block_hash const &, uint64_t);
std::atomic <int> ticket;
bool done;
std::vector <std::thread> threads;
std::list <std::pair <rai::uint256_union, std::function <void (boost::optional <uint64_t> const &)>>> pending;
std::mutex mutex;
std::condition_variable producer_condition;
std::unique_ptr <rai::opencl_work> opencl;
rai::observer_set <bool> work_observers;
// Local work threshold for rate-limiting publishing blocks. ~5 seconds of work.
static uint64_t const publish_test_threshold = 0xff00000000000000;
static uint64_t const publish_full_threshold = 0xffffffc000000000;
static uint64_t const publish_threshold = rai::rai_network == rai::rai_networks::rai_test_network ? publish_test_threshold : publish_full_threshold;
};
// The fan spreads a key out over the heap to decrease the likelyhood of it being recovered by memory inspection
class fan
{
public:
fan (rai::uint256_union const &, size_t);
void value (rai::raw_key &);
void value_set (rai::raw_key const &);
std::vector <std::unique_ptr <rai::uint256_union>> values;
fan (rai::uint256_union const &, size_t);
void value (rai::raw_key &);
void value_set (rai::raw_key const &);
std::vector <std::unique_ptr <rai::uint256_union>> values;
private:
std::mutex mutex;
void value_get (rai::raw_key &);
@ -80,15 +50,15 @@ enum class key_type
class wallet_store
{
public:
wallet_store (bool &, rai::kdf &, rai::transaction &, rai::account, unsigned, std::string const &);
wallet_store (bool &, rai::kdf &, rai::transaction &, rai::account, unsigned, std::string const &, std::string const &);
wallet_store (bool &, rai::kdf &, rai::transaction &, rai::account, unsigned, std::string const &);
wallet_store (bool &, rai::kdf &, rai::transaction &, rai::account, unsigned, std::string const &, std::string const &);
std::vector <rai::account> accounts (MDB_txn *);
void initialize (MDB_txn *, bool &, std::string const &);
rai::uint256_union check (MDB_txn *);
bool rekey (MDB_txn *, std::string const &);
bool valid_password (MDB_txn *);
bool attempt_password (MDB_txn *, std::string const &);
void wallet_key (rai::raw_key &, MDB_txn *);
void initialize (MDB_txn *, bool &, std::string const &);
rai::uint256_union check (MDB_txn *);
bool rekey (MDB_txn *, std::string const &);
bool valid_password (MDB_txn *);
bool attempt_password (MDB_txn *, std::string const &);
void wallet_key (rai::raw_key &, MDB_txn *);
void seed (rai::raw_key &, MDB_txn *);
void seed_set (MDB_txn *, rai::raw_key const &);
rai::key_type key_type (rai::wallet_value const &);
@ -97,25 +67,25 @@ public:
uint32_t deterministic_index_get (MDB_txn *);
void deterministic_index_set (MDB_txn *, uint32_t);
void deterministic_clear (MDB_txn *);
rai::uint256_union salt (MDB_txn *);
bool is_representative (MDB_txn *);
rai::account representative (MDB_txn *);
void representative_set (MDB_txn *, rai::account const &);
rai::public_key insert_adhoc (MDB_txn *, rai::raw_key const &);
void erase (MDB_txn *, rai::public_key const &);
rai::uint256_union salt (MDB_txn *);
bool is_representative (MDB_txn *);
rai::account representative (MDB_txn *);
void representative_set (MDB_txn *, rai::account const &);
rai::public_key insert_adhoc (MDB_txn *, rai::raw_key const &);
void erase (MDB_txn *, rai::public_key const &);
rai::wallet_value entry_get_raw (MDB_txn *, rai::public_key const &);
void entry_put_raw (MDB_txn *, rai::public_key const &, rai::wallet_value const &);
bool fetch (MDB_txn *, rai::public_key const &, rai::raw_key &);
bool exists (MDB_txn *, rai::public_key const &);
bool fetch (MDB_txn *, rai::public_key const &, rai::raw_key &);
bool exists (MDB_txn *, rai::public_key const &);
void destroy (MDB_txn *);
rai::store_iterator find (MDB_txn *, rai::uint256_union const &);
rai::store_iterator begin (MDB_txn *, rai::uint256_union const &);
rai::store_iterator begin (MDB_txn *);
rai::store_iterator end ();
void derive_key (rai::raw_key &, MDB_txn *, std::string const &);
void serialize_json (MDB_txn *, std::string &);
rai::store_iterator find (MDB_txn *, rai::uint256_union const &);
rai::store_iterator begin (MDB_txn *, rai::uint256_union const &);
rai::store_iterator begin (MDB_txn *);
rai::store_iterator end ();
void derive_key (rai::raw_key &, MDB_txn *, std::string const &);
void serialize_json (MDB_txn *, std::string &);
void write_backup (MDB_txn *, boost::filesystem::path const &);
bool move (MDB_txn *, rai::wallet_store &, std::vector <rai::public_key> const &);
bool move (MDB_txn *, rai::wallet_store &, std::vector <rai::public_key> const &);
bool import (MDB_txn *, rai::wallet_store &);
bool work_get (MDB_txn *, rai::public_key const &, uint64_t &);
void work_put (MDB_txn *, rai::public_key const &, uint64_t);
@ -123,25 +93,25 @@ public:
void version_put (MDB_txn *, unsigned);
void upgrade_v1_v2 ();
void upgrade_v2_v3 ();
rai::fan password;
static unsigned const version_1;
static unsigned const version_2;
static unsigned const version_3;
static unsigned const version_current;
static rai::uint256_union const version_special;
static rai::uint256_union const wallet_key_special;
static rai::uint256_union const salt_special;
static rai::uint256_union const check_special;
static rai::uint256_union const representative_special;
rai::fan password;
static unsigned const version_1;
static unsigned const version_2;
static unsigned const version_3;
static unsigned const version_current;
static rai::uint256_union const version_special;
static rai::uint256_union const wallet_key_special;
static rai::uint256_union const salt_special;
static rai::uint256_union const check_special;
static rai::uint256_union const representative_special;
static rai::uint256_union const seed_special;
static rai::uint256_union const deterministic_index_special;
static int const special_count;
static unsigned const kdf_full_work = 64 * 1024;
static unsigned const kdf_test_work = 8;
static unsigned const kdf_work = rai::rai_network == rai::rai_networks::rai_test_network ? kdf_test_work : kdf_full_work;
static int const special_count;
static unsigned const kdf_full_work = 64 * 1024;
static unsigned const kdf_test_work = 8;
static unsigned const kdf_work = rai::rai_network == rai::rai_networks::rai_test_network ? kdf_test_work : kdf_full_work;
rai::kdf & kdf;
rai::mdb_env & environment;
MDB_dbi handle;
MDB_dbi handle;
};
class node;
// A wallet is a set of account keys encrypted by a common encryption key

184
rai/node/work.cpp Normal file
View file

@ -0,0 +1,184 @@
#include <rai/node/work.hpp>
#include <rai/node/xorshift.hpp>
#include <rai/node/openclwork.hpp>
#include <future>
rai::work_pool::work_pool (unsigned max_threads_a, std::unique_ptr <rai::opencl_work> opencl_a) :
ticket (0),
done (false),
opencl (std::move (opencl_a))
{
static_assert (ATOMIC_INT_LOCK_FREE == 2, "Atomic int needed");
auto count (rai::rai_network == rai::rai_networks::rai_test_network ? 1 : std::max (1u, std::min (max_threads_a, std::thread::hardware_concurrency ())));
for (auto i (0); i < count; ++i)
{
auto thread (std::thread ([this, i] ()
{
rai::work_thread_reprioritize ();
loop (i);
}));
threads.push_back (std::move (thread));
}
}
rai::work_pool::~work_pool ()
{
stop ();
for (auto &i: threads)
{
i.join ();
}
}
uint64_t rai::work_pool::work_value (rai::block_hash const & root_a, uint64_t work_a)
{
uint64_t result;
blake2b_state hash;
blake2b_init (&hash, sizeof (result));
blake2b_update (&hash, reinterpret_cast <uint8_t *> (&work_a), sizeof (work_a));
blake2b_update (&hash, root_a.bytes.data (), root_a.bytes.size ());
blake2b_final (&hash, reinterpret_cast <uint8_t *> (&result), sizeof (result));
return result;
}
void rai::work_pool::loop (uint64_t thread)
{
// Quick RNG for work attempts.
xorshift1024star rng;
rai::random_pool.GenerateBlock (reinterpret_cast <uint8_t *> (rng.s.data ()), rng.s.size () * sizeof (decltype (rng.s)::value_type));
uint64_t work;
uint64_t output;
blake2b_state hash;
blake2b_init (&hash, sizeof (output));
std::unique_lock <std::mutex> lock (mutex);
while (!done || !pending.empty())
{
auto empty (pending.empty ());
if (thread == 0)
{
// Only work thread 0 notifies work observers
work_observers (!empty);
}
if (!empty)
{
auto current_l (pending.front ());
int ticket_l (ticket);
lock.unlock ();
output = 0;
// ticket != ticket_l indicates a different thread found a solution and we should stop
while (ticket == ticket_l && output < rai::work_pool::publish_threshold)
{
// Don't query main memory every iteration in order to reduce memory bus traffic
// All operations here operate on stack memory
// Count iterations down to zero since comparing to zero is easier than comparing to another number
unsigned iteration (256);
while (iteration && output < rai::work_pool::publish_threshold)
{
work = rng.next ();
blake2b_update (&hash, reinterpret_cast <uint8_t *> (&work), sizeof (work));
blake2b_update (&hash, current_l.first.bytes.data (), current_l.first.bytes.size ());
blake2b_final (&hash, reinterpret_cast <uint8_t *> (&output), sizeof (output));
blake2b_init (&hash, sizeof (output));
iteration -= 1;
}
}
lock.lock ();
if (ticket == ticket_l)
{
// If the ticket matches what we started with, we're the ones that found the solution
assert (output >= rai::work_pool::publish_threshold);
assert (work_value (current_l.first, work) == output);
// Signal other threads to stop their work next time they check ticket
++ticket;
current_l.second (work);
pending.pop_front ();
}
else
{
// A different thread found a solution
}
}
else
{
// Wait for a work request
producer_condition.wait (lock);
}
}
}
void rai::work_pool::cancel (rai::uint256_union const & root_a)
{
std::lock_guard <std::mutex> lock (mutex);
if (!pending.empty ())
{
if (pending.front ().first == root_a)
{
++ticket;
}
}
pending.remove_if ([&root_a] (decltype (pending)::value_type const & item_a)
{
bool result;
if (item_a.first == root_a)
{
item_a.second (boost::none);
result = true;
}
else
{
result = false;
}
return result;
});
}
bool rai::work_pool::work_validate (rai::block_hash const & root_a, uint64_t work_a)
{
auto result (work_value (root_a, work_a) < rai::work_pool::publish_threshold);
return result;
}
bool rai::work_pool::work_validate (rai::block & block_a)
{
return work_validate (block_a.root (), block_a.block_work ());
}
void rai::work_pool::stop ()
{
std::lock_guard <std::mutex> lock (mutex);
done = true;
producer_condition.notify_all ();
}
void rai::work_pool::generate (rai::uint256_union const & root_a, std::function <void (boost::optional <uint64_t> const &)> callback_a)
{
assert (!root_a.is_zero ());
boost::optional <uint64_t> result;
if (opencl != nullptr)
{
result = opencl->generate_work (*this, root_a);
}
if (!result)
{
std::lock_guard <std::mutex> lock (mutex);
pending.push_back (std::make_pair (root_a, callback_a));
producer_condition.notify_all ();
}
else
{
callback_a (result);
}
}
uint64_t rai::work_pool::generate (rai::uint256_union const & hash_a)
{
std::promise <boost::optional <uint64_t>> work;
generate (hash_a, [&work] (boost::optional <uint64_t> work_a)
{
work.set_value (work_a);
});
auto result (work.get_future ().get ());
return result.value ();
}

42
rai/node/work.hpp Normal file
View file

@ -0,0 +1,42 @@
#pragma once
#include <boost/optional.hpp>
#include <rai/node/common.hpp>
#include <rai/numbers.hpp>
#include <atomic>
#include <memory>
#include <thread>
namespace rai
{
class block;
class opencl_work;
class work_pool
{
public:
work_pool (unsigned, std::unique_ptr <rai::opencl_work>);
~work_pool ();
void loop (uint64_t);
void stop ();
void cancel (rai::uint256_union const &);
void generate (rai::uint256_union const &, std::function <void (boost::optional <uint64_t> const &)>);
uint64_t generate (rai::uint256_union const &);
uint64_t work_value (rai::block_hash const &, uint64_t);
bool work_validate (rai::block &);
bool work_validate (rai::block_hash const &, uint64_t);
std::atomic <int> ticket;
bool done;
std::vector <std::thread> threads;
std::list <std::pair <rai::uint256_union, std::function <void (boost::optional <uint64_t> const &)>>> pending;
std::mutex mutex;
std::condition_variable producer_condition;
std::unique_ptr <rai::opencl_work> opencl;
rai::observer_set <bool> work_observers;
// Local work threshold for rate-limiting publishing blocks. ~5 seconds of work.
static uint64_t const publish_test_threshold = 0xff00000000000000;
static uint64_t const publish_full_threshold = 0xffffffc000000000;
static uint64_t const publish_threshold = rai::rai_network == rai::rai_networks::rai_test_network ? publish_test_threshold : publish_full_threshold;
};
}

View file

@ -6,23 +6,23 @@ namespace rai
class xorshift1024star
{
public:
xorshift1024star ():
p (0)
{
}
std::array <uint64_t, 16> s;
unsigned p;
uint64_t next ()
{
auto p_l (p);
auto pn ((p_l + 1) & 15);
p = pn;
uint64_t s0 = s[ p_l ];
uint64_t s1 = s[ pn ];
s1 ^= s1 << 31; // a
s1 ^= s1 >> 11; // b
s0 ^= s0 >> 30; // c
return ( s[ pn ] = s0 ^ s1 ) * 1181783497276652981LL;
}
xorshift1024star ():
p (0)
{
}
std::array <uint64_t, 16> s;
unsigned p;
uint64_t next ()
{
auto p_l (p);
auto pn ((p_l + 1) & 15);
p = pn;
uint64_t s0 = s[ p_l ];
uint64_t s1 = s[ pn ];
s1 ^= s1 << 31; // a
s1 ^= s1 >> 11; // b
s0 ^= s0 >> 30; // c
return ( s[ pn ] = s0 ^ s1 ) * 1181783497276652981LL;
}
};
}