diff --git a/rai/core_test/node.cpp b/rai/core_test/node.cpp index 5ad541bc..df4191b2 100644 --- a/rai/core_test/node.cpp +++ b/rai/core_test/node.cpp @@ -1460,3 +1460,18 @@ TEST (node, bootstrap_connection_scaling) ASSERT_EQ (1, attempt->target_connections (0)); ASSERT_EQ (1, attempt->target_connections (50000)); } + +TEST (node, online_reps) +{ + rai::system system (24000, 2); + system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv); + ASSERT_TRUE (system.nodes[1]->online_reps.online_stake ().is_zero ()); + system.wallet (0)->send_action (rai::test_genesis_key.pub, rai::test_genesis_key.pub, rai::Gxrb_ratio); + auto iterations (0); + while (system.nodes[1]->online_reps.online_stake ().is_zero ()) + { + system.poll (); + ++iterations; + ASSERT_LT (iterations, 200); + } +} diff --git a/rai/node/node.cpp b/rai/node/node.cpp index d71acfc6..f474b09c 100644 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -1432,17 +1432,18 @@ store (init_a.block_store_init, application_path_a / "data.ldb", config_a.lmdb_m gap_cache (*this), ledger (store, config_a.inactive_supply.number (), config.state_block_parse_canary, config.state_block_generate_canary), active (*this), -wallets (init_a.block_store_init, *this), network (*this, config.peering_port), bootstrap_initiator (*this), bootstrap (service_a, config.peering_port, *this), peers (network.endpoint ()), application_path (application_path_a), +wallets (init_a.block_store_init, *this), port_mapping (*this), vote_processor (*this), warmed_up (0), block_processor (*this), -block_processor_thread ([this]() { this->block_processor.process_blocks (); }) +block_processor_thread ([this]() { this->block_processor.process_blocks (); }), +online_reps (*this) { wallets.observer = [this](bool active) { observers.wallet (active); @@ -1571,6 +1572,9 @@ block_processor_thread ([this]() { this->block_processor.process_blocks (); }) observers.vote.add ([this](std::shared_ptr vote_a, rai::endpoint const &) { this->gap_cache.vote (vote_a); }); + observers.vote.add ([this](std::shared_ptr vote_a, rai::endpoint const &) { + this->online_reps.vote (vote_a); + }); observers.vote.add ([this](std::shared_ptr vote_a, rai::endpoint const & endpoint_a) { if (this->rep_crawler.exists (vote_a->block->hash ())) { @@ -2440,6 +2444,68 @@ bool rai::block_arrival::recent (rai::block_hash const & hash_a) return arrival.get<1> ().find (hash_a) != arrival.get<1> ().end (); } +rai::online_reps::online_reps (rai::node & node) : +node (node) +{ +} + +void rai::online_reps::vote (std::shared_ptr const & vote_a) +{ + auto rep (vote_a->account); + std::lock_guard lock (mutex); + auto now (std::chrono::steady_clock::now ()); + rai::transaction transaction (node.store.environment, nullptr, false); + auto current (reps.begin ()); + while (current != reps.end () && current->last_heard + std::chrono::seconds (rai::node::cutoff) < now) + { + auto old_stake (online_stake_total); + online_stake_total -= node.ledger.weight (transaction, current->representative); + if (online_stake_total > old_stake) + { + // underflow + online_stake_total = 0; + } + current = reps.erase (current); + } + auto rep_it (reps.get<1> ().find (rep)); + auto info (rai::rep_last_heard_info{ now, rep }); + if (rep_it == reps.get<1> ().end ()) + { + auto old_stake (online_stake_total); + online_stake_total += node.ledger.weight (transaction, rep); + if (online_stake_total < old_stake) + { + // overflow + online_stake_total = std::numeric_limits::max (); + } + reps.insert (info); + } + else + { + reps.get<1> ().replace (rep_it, info); + } +} + +void rai::online_reps::recalculate_stake () +{ + std::lock_guard lock (mutex); + online_stake_total = 0; + rai::transaction transaction (node.store.environment, nullptr, false); + for (auto it : reps) + { + online_stake_total += node.ledger.weight (transaction, it.representative); + } + auto now (std::chrono::steady_clock::now ()); + auto node_l (node.shared ()); + node.alarm.add (now + std::chrono::minutes (5), [node_l]() { node_l->online_reps.recalculate_stake (); }); +} + +rai::uint128_t rai::online_reps::online_stake () +{ + std::lock_guard lock (mutex); + return online_stake_total; +} + std::unordered_set rai::peer_container::random_set (size_t count_a) { std::unordered_set result; diff --git a/rai/node/node.hpp b/rai/node/node.hpp index 8a9db526..28805eab 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -297,6 +297,31 @@ public: arrival; std::mutex mutex; }; +class rep_last_heard_info +{ +public: + std::chrono::steady_clock::time_point last_heard; + rai::account representative; +}; +class online_reps +{ +public: + online_reps (rai::node &); + void vote (std::shared_ptr const &); + void recalculate_stake (); + rai::uint128_t online_stake (); + boost::multi_index_container< + rai::rep_last_heard_info, + boost::multi_index::indexed_by< + boost::multi_index::ordered_non_unique>, + boost::multi_index::hashed_unique>>> + reps; + +private: + rai::uint128_t online_stake_total; + std::mutex mutex; + rai::node & node; +}; class network { public: @@ -536,6 +561,7 @@ public: rai::block_processor block_processor; std::thread block_processor_thread; rai::block_arrival block_arrival; + rai::online_reps online_reps; static double constexpr price_max = 16.0; static double constexpr free_cutoff = 1024.0; static std::chrono::seconds constexpr period = std::chrono::seconds (60);