Adding online rep tracking.

This commit is contained in:
clemahieu 2018-04-02 00:05:19 -05:00
commit facbab4d69
3 changed files with 109 additions and 2 deletions

View file

@ -1460,3 +1460,18 @@ TEST (node, bootstrap_connection_scaling)
ASSERT_EQ (1, attempt->target_connections (0));
ASSERT_EQ (1, attempt->target_connections (50000));
}
TEST (node, online_reps)
{
rai::system system (24000, 2);
system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv);
ASSERT_TRUE (system.nodes[1]->online_reps.online_stake ().is_zero ());
system.wallet (0)->send_action (rai::test_genesis_key.pub, rai::test_genesis_key.pub, rai::Gxrb_ratio);
auto iterations (0);
while (system.nodes[1]->online_reps.online_stake ().is_zero ())
{
system.poll ();
++iterations;
ASSERT_LT (iterations, 200);
}
}

View file

@ -1432,17 +1432,18 @@ store (init_a.block_store_init, application_path_a / "data.ldb", config_a.lmdb_m
gap_cache (*this),
ledger (store, config_a.inactive_supply.number (), config.state_block_parse_canary, config.state_block_generate_canary),
active (*this),
wallets (init_a.block_store_init, *this),
network (*this, config.peering_port),
bootstrap_initiator (*this),
bootstrap (service_a, config.peering_port, *this),
peers (network.endpoint ()),
application_path (application_path_a),
wallets (init_a.block_store_init, *this),
port_mapping (*this),
vote_processor (*this),
warmed_up (0),
block_processor (*this),
block_processor_thread ([this]() { this->block_processor.process_blocks (); })
block_processor_thread ([this]() { this->block_processor.process_blocks (); }),
online_reps (*this)
{
wallets.observer = [this](bool active) {
observers.wallet (active);
@ -1571,6 +1572,9 @@ block_processor_thread ([this]() { this->block_processor.process_blocks (); })
observers.vote.add ([this](std::shared_ptr<rai::vote> vote_a, rai::endpoint const &) {
this->gap_cache.vote (vote_a);
});
observers.vote.add ([this](std::shared_ptr<rai::vote> vote_a, rai::endpoint const &) {
this->online_reps.vote (vote_a);
});
observers.vote.add ([this](std::shared_ptr<rai::vote> vote_a, rai::endpoint const & endpoint_a) {
if (this->rep_crawler.exists (vote_a->block->hash ()))
{
@ -2440,6 +2444,68 @@ bool rai::block_arrival::recent (rai::block_hash const & hash_a)
return arrival.get<1> ().find (hash_a) != arrival.get<1> ().end ();
}
rai::online_reps::online_reps (rai::node & node) :
node (node)
{
}
void rai::online_reps::vote (std::shared_ptr<rai::vote> const & vote_a)
{
auto rep (vote_a->account);
std::lock_guard<std::mutex> lock (mutex);
auto now (std::chrono::steady_clock::now ());
rai::transaction transaction (node.store.environment, nullptr, false);
auto current (reps.begin ());
while (current != reps.end () && current->last_heard + std::chrono::seconds (rai::node::cutoff) < now)
{
auto old_stake (online_stake_total);
online_stake_total -= node.ledger.weight (transaction, current->representative);
if (online_stake_total > old_stake)
{
// underflow
online_stake_total = 0;
}
current = reps.erase (current);
}
auto rep_it (reps.get<1> ().find (rep));
auto info (rai::rep_last_heard_info{ now, rep });
if (rep_it == reps.get<1> ().end ())
{
auto old_stake (online_stake_total);
online_stake_total += node.ledger.weight (transaction, rep);
if (online_stake_total < old_stake)
{
// overflow
online_stake_total = std::numeric_limits<rai::uint128_t>::max ();
}
reps.insert (info);
}
else
{
reps.get<1> ().replace (rep_it, info);
}
}
void rai::online_reps::recalculate_stake ()
{
std::lock_guard<std::mutex> lock (mutex);
online_stake_total = 0;
rai::transaction transaction (node.store.environment, nullptr, false);
for (auto it : reps)
{
online_stake_total += node.ledger.weight (transaction, it.representative);
}
auto now (std::chrono::steady_clock::now ());
auto node_l (node.shared ());
node.alarm.add (now + std::chrono::minutes (5), [node_l]() { node_l->online_reps.recalculate_stake (); });
}
rai::uint128_t rai::online_reps::online_stake ()
{
std::lock_guard<std::mutex> lock (mutex);
return online_stake_total;
}
std::unordered_set<rai::endpoint> rai::peer_container::random_set (size_t count_a)
{
std::unordered_set<rai::endpoint> result;

View file

@ -297,6 +297,31 @@ public:
arrival;
std::mutex mutex;
};
class rep_last_heard_info
{
public:
std::chrono::steady_clock::time_point last_heard;
rai::account representative;
};
class online_reps
{
public:
online_reps (rai::node &);
void vote (std::shared_ptr<rai::vote> const &);
void recalculate_stake ();
rai::uint128_t online_stake ();
boost::multi_index_container<
rai::rep_last_heard_info,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::member<rai::rep_last_heard_info, std::chrono::steady_clock::time_point, &rai::rep_last_heard_info::last_heard>>,
boost::multi_index::hashed_unique<boost::multi_index::member<rai::rep_last_heard_info, rai::account, &rai::rep_last_heard_info::representative>>>>
reps;
private:
rai::uint128_t online_stake_total;
std::mutex mutex;
rai::node & node;
};
class network
{
public:
@ -536,6 +561,7 @@ public:
rai::block_processor block_processor;
std::thread block_processor_thread;
rai::block_arrival block_arrival;
rai::online_reps online_reps;
static double constexpr price_max = 16.0;
static double constexpr free_cutoff = 1024.0;
static std::chrono::seconds constexpr period = std::chrono::seconds (60);