From 0788da3ed21d4cc6a3390220fbdb8228e16ffaed Mon Sep 17 00:00:00 2001 From: clemahieu Date: Thu, 31 Jan 2019 00:21:12 +0700 Subject: [PATCH] Long-term online rep weight tracking (#1648) * Adding online_weight table to trend online weight count. * Long term trending of online vote weight. * Update online_reps test to new semantics. * Add review comments and fix rebase * Lambda formatting --- nano/core_test/block_store.cpp | 20 ++++++ nano/core_test/node.cpp | 20 +++--- nano/core_test/rpc.cpp | 2 +- nano/node/lmdb.cpp | 44 ++++++++++++ nano/node/lmdb.hpp | 14 ++++ nano/node/node.cpp | 123 ++++++++++++++++++--------------- nano/node/node.hpp | 31 ++++----- nano/node/rpc.cpp | 2 +- nano/secure/blockstore.hpp | 6 ++ 9 files changed, 179 insertions(+), 83 deletions(-) diff --git a/nano/core_test/block_store.cpp b/nano/core_test/block_store.cpp index 1a9c6c18..502f435f 100644 --- a/nano/core_test/block_store.cpp +++ b/nano/core_test/block_store.cpp @@ -1488,3 +1488,23 @@ TEST (block_store, endpoint_key_byte_order) // This should be in host byte order ASSERT_EQ (port, endpoint_key1.port ()); } + +TEST (block_store, online_weight) +{ + nano::logging logging; + bool error (false); + nano::mdb_store store (error, logging, nano::unique_path ()); + ASSERT_FALSE (error); + auto transaction (store.tx_begin (true)); + ASSERT_EQ (0, store.online_weight_count (transaction)); + ASSERT_EQ (store.online_weight_end (), store.online_weight_begin (transaction)); + store.online_weight_put (transaction, 1, 2); + ASSERT_EQ (1, store.online_weight_count (transaction)); + auto item (store.online_weight_begin (transaction)); + ASSERT_NE (store.online_weight_end (), item); + ASSERT_EQ (1, item->first); + ASSERT_EQ (2, item->second.number ()); + store.online_weight_del (transaction, 1); + ASSERT_EQ (0, store.online_weight_count (transaction)); + ASSERT_EQ (store.online_weight_end (), store.online_weight_begin (transaction)); +} diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index ad104cbb..d1f5a983 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1687,15 +1687,17 @@ TEST (node, stat_counting) TEST (node, online_reps) { - nano::system system (24000, 2); - system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - ASSERT_EQ (system.nodes[1]->config.online_weight_minimum.number (), system.nodes[1]->online_reps.online_stake ()); - system.wallet (0)->send_action (nano::test_genesis_key.pub, nano::test_genesis_key.pub, nano::Gxrb_ratio); - system.deadline_set (10s); - while (system.nodes[1]->online_reps.online_stake () == system.nodes[1]->config.online_weight_minimum.number ()) - { - ASSERT_NO_ERROR (system.poll ()); - } + nano::system system (24000, 1); + // 1 sample of minimum weight + ASSERT_EQ (system.nodes[0]->config.online_weight_minimum, system.nodes[0]->online_reps.online_stake ()); + auto vote (std::make_shared ()); + system.nodes[0]->online_reps.observe (nano::test_genesis_key.pub); + // 1 minimum, 1 maximum + system.nodes[0]->online_reps.sample (); + ASSERT_EQ (nano::genesis_amount, system.nodes[0]->online_reps.online_stake ()); + // 2 minimum, 1 maximum + system.nodes[0]->online_reps.sample (); + ASSERT_EQ (system.nodes[0]->config.online_weight_minimum, system.nodes[0]->online_reps.online_stake ()); } TEST (node, block_confirm) diff --git a/nano/core_test/rpc.cpp b/nano/core_test/rpc.cpp index 0d6ca54d..e31c875d 100644 --- a/nano/core_test/rpc.cpp +++ b/nano/core_test/rpc.cpp @@ -3924,7 +3924,7 @@ TEST (rpc, online_reps) auto send_block (system.wallet (0)->send_action (nano::test_genesis_key.pub, nano::test_genesis_key.pub, nano::Gxrb_ratio)); ASSERT_NE (nullptr, send_block); system.deadline_set (10s); - while (system.nodes[1]->online_reps.online_stake () == system.nodes[1]->config.online_weight_minimum.number ()) + while (system.nodes[1]->online_reps.list ().empty ()) { ASSERT_NO_ERROR (system.poll ()); } diff --git a/nano/node/lmdb.cpp b/nano/node/lmdb.cpp index 61efff71..8ae78273 100644 --- a/nano/node/lmdb.cpp +++ b/nano/node/lmdb.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -140,6 +141,17 @@ buffer (std::make_shared> ()) value = { buffer->size (), const_cast (buffer->data ()) }; } +nano::mdb_val::mdb_val (uint64_t val_a) : +buffer (std::make_shared> ()) +{ + { + boost::endian::native_to_big_inplace (val_a); + nano::vectorstream stream (*buffer); + nano::write (stream, val_a); + } + value = { buffer->size (), const_cast (buffer->data ()) }; +} + void * nano::mdb_val::data () const { return value.mv_data; @@ -297,6 +309,7 @@ nano::mdb_val::operator uint64_t () const nano::bufferstream stream (reinterpret_cast (value.mv_data), value.mv_size); auto error (nano::read (stream, result)); assert (!error); + boost::endian::big_to_native_inplace (result); return result; } @@ -752,6 +765,7 @@ env (error_a, path_a, lmdb_max_dbs) error_a |= mdb_dbi_open (env.tx (transaction), "representation", MDB_CREATE, &representation) != 0; error_a |= mdb_dbi_open (env.tx (transaction), "unchecked", MDB_CREATE, &unchecked) != 0; error_a |= mdb_dbi_open (env.tx (transaction), "vote", MDB_CREATE, &vote) != 0; + error_a |= mdb_dbi_open (env.tx (transaction), "online_weight", MDB_CREATE, &online_weight) != 0; error_a |= mdb_dbi_open (env.tx (transaction), "meta", MDB_CREATE, &meta) != 0; error_a |= mdb_dbi_open (env.tx (transaction), "peers", MDB_CREATE, &peers) != 0; if (!full_sideband (transaction)) @@ -2136,6 +2150,36 @@ size_t nano::mdb_store::unchecked_count (nano::transaction const & transaction_a return result; } +void nano::mdb_store::online_weight_put (nano::transaction const & transaction_a, uint64_t time_a, nano::amount const & amount_a) +{ + auto status (mdb_put (env.tx (transaction_a), online_weight, nano::mdb_val (time_a), nano::mdb_val (amount_a), 0)); + release_assert (status == 0); +} + +void nano::mdb_store::online_weight_del (nano::transaction const & transaction_a, uint64_t time_a) +{ + auto status (mdb_del (env.tx (transaction_a), online_weight, nano::mdb_val (time_a), nullptr)); + release_assert (status == 0); +} + +nano::store_iterator nano::mdb_store::online_weight_begin (nano::transaction const & transaction_a) +{ + return nano::store_iterator (std::make_unique> (transaction_a, online_weight)); +} + +nano::store_iterator nano::mdb_store::online_weight_end () +{ + return nano::store_iterator (nullptr); +} + +size_t nano::mdb_store::online_weight_count (nano::transaction const & transaction_a) const +{ + MDB_stat online_weight_stats; + auto status1 (mdb_stat (env.tx (transaction_a), online_weight, &online_weight_stats)); + release_assert (status1 == 0); + return online_weight_stats.ms_entries; +} + void nano::mdb_store::flush (nano::transaction const & transaction_a) { { diff --git a/nano/node/lmdb.hpp b/nano/node/lmdb.hpp index f04002ad..afee3c2e 100644 --- a/nano/node/lmdb.hpp +++ b/nano/node/lmdb.hpp @@ -59,6 +59,7 @@ public: mdb_val (nano::endpoint_key const &); mdb_val (std::shared_ptr const &); mdb_val (std::shared_ptr const &); + mdb_val (uint64_t); void * data () const; size_t size () const; explicit operator nano::account_info () const; @@ -237,6 +238,13 @@ public: void flush (nano::transaction const &) override; nano::store_iterator> vote_begin (nano::transaction const &) override; nano::store_iterator> vote_end () override; + + void online_weight_put (nano::transaction const &, uint64_t, nano::amount const &) override; + void online_weight_del (nano::transaction const &, uint64_t) override; + nano::store_iterator online_weight_begin (nano::transaction const &) override; + nano::store_iterator online_weight_end () override; + size_t online_weight_count (nano::transaction const &) const override; + std::mutex cache_mutex; std::unordered_map> vote_cache_l1; std::unordered_map> vote_cache_l2; @@ -370,6 +378,12 @@ public: */ MDB_dbi vote{ 0 }; + /** + * Samples of online vote weight + * uint64_t -> nano::amount + */ + MDB_dbi online_weight{ 0 }; + /** * Meta information about block store, such as versions. * nano::uint256_union (arbitrary key) -> blob diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 2e9e71a7..8ad33d0b 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -31,6 +31,8 @@ unsigned constexpr nano::active_transactions::request_interval_ms; size_t constexpr nano::active_transactions::max_broadcast_queue; size_t constexpr nano::block_arrival::arrival_size_min; std::chrono::seconds constexpr nano::block_arrival::arrival_time_min; +uint64_t constexpr nano::online_reps::weight_period; +uint64_t constexpr nano::online_reps::weight_samples; namespace nano { @@ -2080,7 +2082,7 @@ block_processor_thread ([this]() { nano::thread_role::set (nano::thread_role::name::block_processing); this->block_processor.process_blocks (); }), -online_reps (*this), +online_reps (ledger, config.online_weight_minimum.number ()), stats (config.stat_config), vote_uniquer (block_uniquer), startup_time (std::chrono::steady_clock::now ()) @@ -2145,7 +2147,7 @@ startup_time (std::chrono::steady_clock::now ()) observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr vote_a, nano::endpoint const & endpoint_a) { assert (endpoint_a.address ().is_v6 ()); this->gap_cache.vote (vote_a); - this->online_reps.vote (vote_a); + this->online_reps.observe (vote_a->account); nano::uint128_t rep_weight; nano::uint128_t min_rep_weight; { @@ -2554,6 +2556,7 @@ void nano::node::start () ongoing_rep_crawl (); ongoing_rep_calculation (); ongoing_peer_store (); + ongoing_online_weight_calculation_queue (); if (!flags.disable_bootstrap_listener) { bootstrap.start (); @@ -2571,7 +2574,6 @@ void nano::node::start () this_l->bootstrap_wallet (); }); } - online_reps.recalculate_stake (); port_mapping.start (); if (!flags.disable_unchecked_cleaning) { @@ -3206,6 +3208,23 @@ nano::uint128_t nano::node::delta () return result; } +void nano::node::ongoing_online_weight_calculation_queue () +{ + std::weak_ptr node_w (shared_from_this ()); + alarm.add (std::chrono::steady_clock::now () + (std::chrono::seconds (nano::online_reps::weight_period)), [node_w]() { + if (auto node_l = node_w.lock ()) + { + node_l->ongoing_online_weight_calculation (); + } + }); +} + +void nano::node::ongoing_online_weight_calculation () +{ + online_reps.sample (); + ongoing_online_weight_calculation_queue (); +} + namespace { class confirmed_visitor : public nano::block_visitor @@ -3375,80 +3394,76 @@ std::unique_ptr collect_seq_con_info (block_arrival & bl } } -nano::online_reps::online_reps (nano::node & node) : -node (node) +nano::online_reps::online_reps (nano::ledger & ledger_a, nano::uint128_t minimum_a) : +ledger (ledger_a), +minimum (minimum_a) { + auto transaction (ledger_a.store.tx_begin_read ()); + online = trend (transaction); } -void nano::online_reps::vote (std::shared_ptr const & vote_a) +void nano::online_reps::observe (nano::account const & rep_a) { - auto rep (vote_a->account); - std::lock_guard lock (mutex); - auto now (std::chrono::steady_clock::now ()); - auto transaction (node.store.tx_begin_read ()); - auto current (reps.begin ()); - while (current != reps.end () && current->last_heard + std::chrono::seconds (nano::node::cutoff) < now) + auto transaction (ledger.store.tx_begin_read ()); + if (ledger.weight (transaction, rep_a) > nano::Gxrb_ratio) { - auto old_stake (online_stake_total); - online_stake_total -= node.ledger.weight (transaction, current->representative); - if (online_stake_total > old_stake) - { - // underflow - online_stake_total = 0; - } - current = reps.erase (current); - } - auto rep_it (reps.get<1> ().find (rep)); - auto info (nano::rep_last_heard_info{ now, rep }); - if (rep_it == reps.get<1> ().end ()) - { - auto old_stake (online_stake_total); - online_stake_total += node.ledger.weight (transaction, rep); - if (online_stake_total < old_stake) - { - // overflow - online_stake_total = std::numeric_limits::max (); - } - reps.insert (info); - } - else - { - reps.get<1> ().replace (rep_it, info); + std::lock_guard lock (mutex); + reps.insert (rep_a); } } -void nano::online_reps::recalculate_stake () +void nano::online_reps::sample () { - std::lock_guard lock (mutex); - online_stake_total = 0; - auto transaction (node.store.tx_begin_read ()); - for (auto it : reps) + auto transaction (ledger.store.tx_begin_write ()); + // Discard oldest entries + while (ledger.store.online_weight_count (transaction) >= weight_samples) { - online_stake_total += node.ledger.weight (transaction, it.representative); + auto oldest (ledger.store.online_weight_begin (transaction)); + assert (oldest != ledger.store.online_weight_end ()); + ledger.store.online_weight_del (transaction, oldest->first); } - auto now (std::chrono::steady_clock::now ()); - std::weak_ptr node_w (node.shared ()); - node.alarm.add (now + std::chrono::minutes (5), [node_w]() { - if (auto node_l = node_w.lock ()) - { - node_l->online_reps.recalculate_stake (); - } - }); + // Calculate current active rep weight + nano::uint128_t current; + for (auto & i : reps) + { + current += ledger.weight (transaction, i); + } + reps.clear (); + ledger.store.online_weight_put (transaction, std::chrono::system_clock::now ().time_since_epoch ().count (), current); + auto trend_l (trend (transaction)); + std::lock_guard lock (mutex); + online = trend_l; +} + +nano::uint128_t nano::online_reps::trend (nano::transaction & transaction_a) +{ + std::vector items; + items.reserve (weight_samples + 1); + items.push_back (minimum); + for (auto i (ledger.store.online_weight_begin (transaction_a)), n (ledger.store.online_weight_end ()); i != n; ++i) + { + items.push_back (i->second.number ()); + } + + // Pick median value for our target vote weight + auto median_idx = items.size () / 2; + nth_element (items.begin (), items.begin () + median_idx, items.end ()); + return nano::uint128_t{ items[median_idx] }; } nano::uint128_t nano::online_reps::online_stake () { std::lock_guard lock (mutex); - return std::max (online_stake_total, node.config.online_weight_minimum.number ()); + return std::max (online, minimum); } std::vector nano::online_reps::list () { std::vector result; std::lock_guard lock (mutex); - for (auto i (reps.begin ()), n (reps.end ()); i != n; ++i) + for (auto & i : reps) { - result.push_back (i->representative); + result.push_back (i); } return result; } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 1b298453..e39b467f 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -234,32 +234,25 @@ public: std::unique_ptr collect_seq_con_info (block_arrival & block_arrival, const std::string & name); -class rep_last_heard_info -{ -public: - std::chrono::steady_clock::time_point last_heard; - nano::account representative; -}; class online_reps { public: - online_reps (nano::node &); - void vote (std::shared_ptr const &); - void recalculate_stake (); + online_reps (nano::ledger &, nano::uint128_t); + void observe (nano::account const &); + void sample (); nano::uint128_t online_stake (); - nano::uint128_t online_stake_total; std::vector list (); + static uint64_t constexpr weight_period = 5 * 60; // 5 minutes + // The maximum amount of samples for a 2 week period on live or 3 days on beta + static uint64_t constexpr weight_samples = (nano::nano_network == nano::nano_networks::nano_live_network) ? 4032 : 864; private: - boost::multi_index_container< - nano::rep_last_heard_info, - boost::multi_index::indexed_by< - boost::multi_index::ordered_non_unique>, - boost::multi_index::hashed_unique>>> - reps; - + nano::uint128_t trend (nano::transaction &); std::mutex mutex; - nano::node & node; + nano::ledger & ledger; + std::unordered_set reps; + nano::uint128_t online; + nano::uint128_t minimum; friend std::unique_ptr collect_seq_con_info (online_reps & online_reps, const std::string & name); }; @@ -579,6 +572,8 @@ public: bool validate_block_by_previous (nano::transaction const &, std::shared_ptr); void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr, std::shared_ptr, std::shared_ptr); nano::uint128_t delta (); + void ongoing_online_weight_calculation (); + void ongoing_online_weight_calculation_queue (); boost::asio::io_context & io_ctx; nano::node_config config; nano::node_flags flags; diff --git a/nano/node/rpc.cpp b/nano/node/rpc.cpp index 349c9d24..13fe6a12 100644 --- a/nano/node/rpc.cpp +++ b/nano/node/rpc.cpp @@ -1614,7 +1614,7 @@ void nano::rpc_handler::confirmation_quorum () response_l.put ("quorum_delta", node.delta ().convert_to ()); response_l.put ("online_weight_quorum_percent", std::to_string (node.config.online_weight_quorum)); response_l.put ("online_weight_minimum", node.config.online_weight_minimum.to_string_dec ()); - response_l.put ("online_stake_total", node.online_reps.online_stake_total.convert_to ()); + response_l.put ("online_stake_total", node.online_reps.online_stake ().convert_to ()); response_l.put ("peers_stake_total", node.peers.total_weight ().convert_to ()); if (request.get ("peer_details", false)) { diff --git a/nano/secure/blockstore.hpp b/nano/secure/blockstore.hpp index 27a4372b..166bc86a 100644 --- a/nano/secure/blockstore.hpp +++ b/nano/secure/blockstore.hpp @@ -288,6 +288,12 @@ public: virtual nano::store_iterator> vote_begin (nano::transaction const &) = 0; virtual nano::store_iterator> vote_end () = 0; + virtual void online_weight_put (nano::transaction const &, uint64_t, nano::amount const &) = 0; + virtual void online_weight_del (nano::transaction const &, uint64_t) = 0; + virtual nano::store_iterator online_weight_begin (nano::transaction const &) = 0; + virtual nano::store_iterator online_weight_end () = 0; + virtual size_t online_weight_count (nano::transaction const &) const = 0; + virtual void version_put (nano::transaction const &, int) = 0; virtual int version_get (nano::transaction const &) = 0;