Long-term online rep weight tracking (#1648)

* Adding online_weight table to trend online weight count.

* Long term trending of online vote weight.

* Update online_reps test to new semantics.

* Add review comments and fix rebase

* Lambda formatting
This commit is contained in:
clemahieu 2019-01-31 00:21:12 +07:00 committed by Wesley Shillingford
commit 0788da3ed2
9 changed files with 179 additions and 83 deletions

View file

@ -1488,3 +1488,23 @@ TEST (block_store, endpoint_key_byte_order)
// This should be in host byte order
ASSERT_EQ (port, endpoint_key1.port ());
}
TEST (block_store, online_weight)
{
nano::logging logging;
bool error (false);
nano::mdb_store store (error, logging, nano::unique_path ());
ASSERT_FALSE (error);
auto transaction (store.tx_begin (true));
ASSERT_EQ (0, store.online_weight_count (transaction));
ASSERT_EQ (store.online_weight_end (), store.online_weight_begin (transaction));
store.online_weight_put (transaction, 1, 2);
ASSERT_EQ (1, store.online_weight_count (transaction));
auto item (store.online_weight_begin (transaction));
ASSERT_NE (store.online_weight_end (), item);
ASSERT_EQ (1, item->first);
ASSERT_EQ (2, item->second.number ());
store.online_weight_del (transaction, 1);
ASSERT_EQ (0, store.online_weight_count (transaction));
ASSERT_EQ (store.online_weight_end (), store.online_weight_begin (transaction));
}

View file

@ -1687,15 +1687,17 @@ TEST (node, stat_counting)
TEST (node, online_reps)
{
nano::system system (24000, 2);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
ASSERT_EQ (system.nodes[1]->config.online_weight_minimum.number (), system.nodes[1]->online_reps.online_stake ());
system.wallet (0)->send_action (nano::test_genesis_key.pub, nano::test_genesis_key.pub, nano::Gxrb_ratio);
system.deadline_set (10s);
while (system.nodes[1]->online_reps.online_stake () == system.nodes[1]->config.online_weight_minimum.number ())
{
ASSERT_NO_ERROR (system.poll ());
}
nano::system system (24000, 1);
// 1 sample of minimum weight
ASSERT_EQ (system.nodes[0]->config.online_weight_minimum, system.nodes[0]->online_reps.online_stake ());
auto vote (std::make_shared<nano::vote> ());
system.nodes[0]->online_reps.observe (nano::test_genesis_key.pub);
// 1 minimum, 1 maximum
system.nodes[0]->online_reps.sample ();
ASSERT_EQ (nano::genesis_amount, system.nodes[0]->online_reps.online_stake ());
// 2 minimum, 1 maximum
system.nodes[0]->online_reps.sample ();
ASSERT_EQ (system.nodes[0]->config.online_weight_minimum, system.nodes[0]->online_reps.online_stake ());
}
TEST (node, block_confirm)

View file

@ -3924,7 +3924,7 @@ TEST (rpc, online_reps)
auto send_block (system.wallet (0)->send_action (nano::test_genesis_key.pub, nano::test_genesis_key.pub, nano::Gxrb_ratio));
ASSERT_NE (nullptr, send_block);
system.deadline_set (10s);
while (system.nodes[1]->online_reps.online_stake () == system.nodes[1]->config.online_weight_minimum.number ())
while (system.nodes[1]->online_reps.list ().empty ())
{
ASSERT_NO_ERROR (system.poll ());
}

View file

@ -4,6 +4,7 @@
#include <nano/node/common.hpp>
#include <nano/secure/versioning.hpp>
#include <boost/endian/conversion.hpp>
#include <boost/polymorphic_cast.hpp>
#include <queue>
@ -140,6 +141,17 @@ buffer (std::make_shared<std::vector<uint8_t>> ())
value = { buffer->size (), const_cast<uint8_t *> (buffer->data ()) };
}
nano::mdb_val::mdb_val (uint64_t val_a) :
buffer (std::make_shared<std::vector<uint8_t>> ())
{
{
boost::endian::native_to_big_inplace (val_a);
nano::vectorstream stream (*buffer);
nano::write (stream, val_a);
}
value = { buffer->size (), const_cast<uint8_t *> (buffer->data ()) };
}
void * nano::mdb_val::data () const
{
return value.mv_data;
@ -297,6 +309,7 @@ nano::mdb_val::operator uint64_t () const
nano::bufferstream stream (reinterpret_cast<uint8_t const *> (value.mv_data), value.mv_size);
auto error (nano::read (stream, result));
assert (!error);
boost::endian::big_to_native_inplace (result);
return result;
}
@ -752,6 +765,7 @@ env (error_a, path_a, lmdb_max_dbs)
error_a |= mdb_dbi_open (env.tx (transaction), "representation", MDB_CREATE, &representation) != 0;
error_a |= mdb_dbi_open (env.tx (transaction), "unchecked", MDB_CREATE, &unchecked) != 0;
error_a |= mdb_dbi_open (env.tx (transaction), "vote", MDB_CREATE, &vote) != 0;
error_a |= mdb_dbi_open (env.tx (transaction), "online_weight", MDB_CREATE, &online_weight) != 0;
error_a |= mdb_dbi_open (env.tx (transaction), "meta", MDB_CREATE, &meta) != 0;
error_a |= mdb_dbi_open (env.tx (transaction), "peers", MDB_CREATE, &peers) != 0;
if (!full_sideband (transaction))
@ -2136,6 +2150,36 @@ size_t nano::mdb_store::unchecked_count (nano::transaction const & transaction_a
return result;
}
void nano::mdb_store::online_weight_put (nano::transaction const & transaction_a, uint64_t time_a, nano::amount const & amount_a)
{
auto status (mdb_put (env.tx (transaction_a), online_weight, nano::mdb_val (time_a), nano::mdb_val (amount_a), 0));
release_assert (status == 0);
}
void nano::mdb_store::online_weight_del (nano::transaction const & transaction_a, uint64_t time_a)
{
auto status (mdb_del (env.tx (transaction_a), online_weight, nano::mdb_val (time_a), nullptr));
release_assert (status == 0);
}
nano::store_iterator<uint64_t, nano::amount> nano::mdb_store::online_weight_begin (nano::transaction const & transaction_a)
{
return nano::store_iterator<uint64_t, nano::amount> (std::make_unique<nano::mdb_iterator<uint64_t, nano::amount>> (transaction_a, online_weight));
}
nano::store_iterator<uint64_t, nano::amount> nano::mdb_store::online_weight_end ()
{
return nano::store_iterator<uint64_t, nano::amount> (nullptr);
}
size_t nano::mdb_store::online_weight_count (nano::transaction const & transaction_a) const
{
MDB_stat online_weight_stats;
auto status1 (mdb_stat (env.tx (transaction_a), online_weight, &online_weight_stats));
release_assert (status1 == 0);
return online_weight_stats.ms_entries;
}
void nano::mdb_store::flush (nano::transaction const & transaction_a)
{
{

View file

@ -59,6 +59,7 @@ public:
mdb_val (nano::endpoint_key const &);
mdb_val (std::shared_ptr<nano::block> const &);
mdb_val (std::shared_ptr<nano::vote> const &);
mdb_val (uint64_t);
void * data () const;
size_t size () const;
explicit operator nano::account_info () const;
@ -237,6 +238,13 @@ public:
void flush (nano::transaction const &) override;
nano::store_iterator<nano::account, std::shared_ptr<nano::vote>> vote_begin (nano::transaction const &) override;
nano::store_iterator<nano::account, std::shared_ptr<nano::vote>> vote_end () override;
void online_weight_put (nano::transaction const &, uint64_t, nano::amount const &) override;
void online_weight_del (nano::transaction const &, uint64_t) override;
nano::store_iterator<uint64_t, nano::amount> online_weight_begin (nano::transaction const &) override;
nano::store_iterator<uint64_t, nano::amount> online_weight_end () override;
size_t online_weight_count (nano::transaction const &) const override;
std::mutex cache_mutex;
std::unordered_map<nano::account, std::shared_ptr<nano::vote>> vote_cache_l1;
std::unordered_map<nano::account, std::shared_ptr<nano::vote>> vote_cache_l2;
@ -370,6 +378,12 @@ public:
*/
MDB_dbi vote{ 0 };
/**
* Samples of online vote weight
* uint64_t -> nano::amount
*/
MDB_dbi online_weight{ 0 };
/**
* Meta information about block store, such as versions.
* nano::uint256_union (arbitrary key) -> blob

View file

@ -31,6 +31,8 @@ unsigned constexpr nano::active_transactions::request_interval_ms;
size_t constexpr nano::active_transactions::max_broadcast_queue;
size_t constexpr nano::block_arrival::arrival_size_min;
std::chrono::seconds constexpr nano::block_arrival::arrival_time_min;
uint64_t constexpr nano::online_reps::weight_period;
uint64_t constexpr nano::online_reps::weight_samples;
namespace nano
{
@ -2080,7 +2082,7 @@ block_processor_thread ([this]() {
nano::thread_role::set (nano::thread_role::name::block_processing);
this->block_processor.process_blocks ();
}),
online_reps (*this),
online_reps (ledger, config.online_weight_minimum.number ()),
stats (config.stat_config),
vote_uniquer (block_uniquer),
startup_time (std::chrono::steady_clock::now ())
@ -2145,7 +2147,7 @@ startup_time (std::chrono::steady_clock::now ())
observers.vote.add ([this](nano::transaction const & transaction, std::shared_ptr<nano::vote> vote_a, nano::endpoint const & endpoint_a) {
assert (endpoint_a.address ().is_v6 ());
this->gap_cache.vote (vote_a);
this->online_reps.vote (vote_a);
this->online_reps.observe (vote_a->account);
nano::uint128_t rep_weight;
nano::uint128_t min_rep_weight;
{
@ -2554,6 +2556,7 @@ void nano::node::start ()
ongoing_rep_crawl ();
ongoing_rep_calculation ();
ongoing_peer_store ();
ongoing_online_weight_calculation_queue ();
if (!flags.disable_bootstrap_listener)
{
bootstrap.start ();
@ -2571,7 +2574,6 @@ void nano::node::start ()
this_l->bootstrap_wallet ();
});
}
online_reps.recalculate_stake ();
port_mapping.start ();
if (!flags.disable_unchecked_cleaning)
{
@ -3206,6 +3208,23 @@ nano::uint128_t nano::node::delta ()
return result;
}
void nano::node::ongoing_online_weight_calculation_queue ()
{
std::weak_ptr<nano::node> node_w (shared_from_this ());
alarm.add (std::chrono::steady_clock::now () + (std::chrono::seconds (nano::online_reps::weight_period)), [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_online_weight_calculation ();
}
});
}
void nano::node::ongoing_online_weight_calculation ()
{
online_reps.sample ();
ongoing_online_weight_calculation_queue ();
}
namespace
{
class confirmed_visitor : public nano::block_visitor
@ -3375,80 +3394,76 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (block_arrival & bl
}
}
nano::online_reps::online_reps (nano::node & node) :
node (node)
nano::online_reps::online_reps (nano::ledger & ledger_a, nano::uint128_t minimum_a) :
ledger (ledger_a),
minimum (minimum_a)
{
auto transaction (ledger_a.store.tx_begin_read ());
online = trend (transaction);
}
void nano::online_reps::vote (std::shared_ptr<nano::vote> const & vote_a)
void nano::online_reps::observe (nano::account const & rep_a)
{
auto rep (vote_a->account);
std::lock_guard<std::mutex> lock (mutex);
auto now (std::chrono::steady_clock::now ());
auto transaction (node.store.tx_begin_read ());
auto current (reps.begin ());
while (current != reps.end () && current->last_heard + std::chrono::seconds (nano::node::cutoff) < now)
auto transaction (ledger.store.tx_begin_read ());
if (ledger.weight (transaction, rep_a) > nano::Gxrb_ratio)
{
auto old_stake (online_stake_total);
online_stake_total -= node.ledger.weight (transaction, current->representative);
if (online_stake_total > old_stake)
{
// underflow
online_stake_total = 0;
}
current = reps.erase (current);
}
auto rep_it (reps.get<1> ().find (rep));
auto info (nano::rep_last_heard_info{ now, rep });
if (rep_it == reps.get<1> ().end ())
{
auto old_stake (online_stake_total);
online_stake_total += node.ledger.weight (transaction, rep);
if (online_stake_total < old_stake)
{
// overflow
online_stake_total = std::numeric_limits<nano::uint128_t>::max ();
}
reps.insert (info);
}
else
{
reps.get<1> ().replace (rep_it, info);
std::lock_guard<std::mutex> lock (mutex);
reps.insert (rep_a);
}
}
void nano::online_reps::recalculate_stake ()
void nano::online_reps::sample ()
{
std::lock_guard<std::mutex> lock (mutex);
online_stake_total = 0;
auto transaction (node.store.tx_begin_read ());
for (auto it : reps)
auto transaction (ledger.store.tx_begin_write ());
// Discard oldest entries
while (ledger.store.online_weight_count (transaction) >= weight_samples)
{
online_stake_total += node.ledger.weight (transaction, it.representative);
auto oldest (ledger.store.online_weight_begin (transaction));
assert (oldest != ledger.store.online_weight_end ());
ledger.store.online_weight_del (transaction, oldest->first);
}
auto now (std::chrono::steady_clock::now ());
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (now + std::chrono::minutes (5), [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->online_reps.recalculate_stake ();
}
});
// Calculate current active rep weight
nano::uint128_t current;
for (auto & i : reps)
{
current += ledger.weight (transaction, i);
}
reps.clear ();
ledger.store.online_weight_put (transaction, std::chrono::system_clock::now ().time_since_epoch ().count (), current);
auto trend_l (trend (transaction));
std::lock_guard<std::mutex> lock (mutex);
online = trend_l;
}
nano::uint128_t nano::online_reps::trend (nano::transaction & transaction_a)
{
std::vector<nano::uint128_t> items;
items.reserve (weight_samples + 1);
items.push_back (minimum);
for (auto i (ledger.store.online_weight_begin (transaction_a)), n (ledger.store.online_weight_end ()); i != n; ++i)
{
items.push_back (i->second.number ());
}
// Pick median value for our target vote weight
auto median_idx = items.size () / 2;
nth_element (items.begin (), items.begin () + median_idx, items.end ());
return nano::uint128_t{ items[median_idx] };
}
nano::uint128_t nano::online_reps::online_stake ()
{
std::lock_guard<std::mutex> lock (mutex);
return std::max (online_stake_total, node.config.online_weight_minimum.number ());
return std::max (online, minimum);
}
std::vector<nano::account> nano::online_reps::list ()
{
std::vector<nano::account> result;
std::lock_guard<std::mutex> lock (mutex);
for (auto i (reps.begin ()), n (reps.end ()); i != n; ++i)
for (auto & i : reps)
{
result.push_back (i->representative);
result.push_back (i);
}
return result;
}

View file

@ -234,32 +234,25 @@ public:
std::unique_ptr<seq_con_info_component> collect_seq_con_info (block_arrival & block_arrival, const std::string & name);
class rep_last_heard_info
{
public:
std::chrono::steady_clock::time_point last_heard;
nano::account representative;
};
class online_reps
{
public:
online_reps (nano::node &);
void vote (std::shared_ptr<nano::vote> const &);
void recalculate_stake ();
online_reps (nano::ledger &, nano::uint128_t);
void observe (nano::account const &);
void sample ();
nano::uint128_t online_stake ();
nano::uint128_t online_stake_total;
std::vector<nano::account> list ();
static uint64_t constexpr weight_period = 5 * 60; // 5 minutes
// The maximum amount of samples for a 2 week period on live or 3 days on beta
static uint64_t constexpr weight_samples = (nano::nano_network == nano::nano_networks::nano_live_network) ? 4032 : 864;
private:
boost::multi_index_container<
nano::rep_last_heard_info,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::member<nano::rep_last_heard_info, std::chrono::steady_clock::time_point, &nano::rep_last_heard_info::last_heard>>,
boost::multi_index::hashed_unique<boost::multi_index::member<nano::rep_last_heard_info, nano::account, &nano::rep_last_heard_info::representative>>>>
reps;
nano::uint128_t trend (nano::transaction &);
std::mutex mutex;
nano::node & node;
nano::ledger & ledger;
std::unordered_set<nano::account> reps;
nano::uint128_t online;
nano::uint128_t minimum;
friend std::unique_ptr<seq_con_info_component> collect_seq_con_info (online_reps & online_reps, const std::string & name);
};
@ -579,6 +572,8 @@ public:
bool validate_block_by_previous (nano::transaction const &, std::shared_ptr<nano::block>);
void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr<std::string>, std::shared_ptr<std::string>, std::shared_ptr<boost::asio::ip::tcp::resolver>);
nano::uint128_t delta ();
void ongoing_online_weight_calculation ();
void ongoing_online_weight_calculation_queue ();
boost::asio::io_context & io_ctx;
nano::node_config config;
nano::node_flags flags;

View file

@ -1614,7 +1614,7 @@ void nano::rpc_handler::confirmation_quorum ()
response_l.put ("quorum_delta", node.delta ().convert_to<std::string> ());
response_l.put ("online_weight_quorum_percent", std::to_string (node.config.online_weight_quorum));
response_l.put ("online_weight_minimum", node.config.online_weight_minimum.to_string_dec ());
response_l.put ("online_stake_total", node.online_reps.online_stake_total.convert_to<std::string> ());
response_l.put ("online_stake_total", node.online_reps.online_stake ().convert_to<std::string> ());
response_l.put ("peers_stake_total", node.peers.total_weight ().convert_to<std::string> ());
if (request.get<bool> ("peer_details", false))
{

View file

@ -288,6 +288,12 @@ public:
virtual nano::store_iterator<nano::account, std::shared_ptr<nano::vote>> vote_begin (nano::transaction const &) = 0;
virtual nano::store_iterator<nano::account, std::shared_ptr<nano::vote>> vote_end () = 0;
virtual void online_weight_put (nano::transaction const &, uint64_t, nano::amount const &) = 0;
virtual void online_weight_del (nano::transaction const &, uint64_t) = 0;
virtual nano::store_iterator<uint64_t, nano::amount> online_weight_begin (nano::transaction const &) = 0;
virtual nano::store_iterator<uint64_t, nano::amount> online_weight_end () = 0;
virtual size_t online_weight_count (nano::transaction const &) const = 0;
virtual void version_put (nano::transaction const &, int) = 0;
virtual int version_get (nano::transaction const &) = 0;