Cached local representatives list (#1372)

* Representative weight drifts slowly on the active network. foreach_representative is called frequently and in the voting loop. Instead of doing a linear scan of the wallet and an IO checking for the weight of each account, periodically recompute if there are representative keys in any of the wallets.

* Simplify logic in main code paths by removing if-checks on if voting is enabled and do it inside foreach_representative.

* Add deadlines to cached_representatives tests

* Add config option for minimum local representative weight

Default 1000 Nano
Large services can set higher values to limit votes CPU load, while singe users willing to have small working representative can set it lower

* Fix weight & minimum comparision

* Test for vote_minimum

* Formatting fixes.

* Prepare amster merge

* Config vote_minimum

* Config v16

* Fix merge errors

* Update representatives at account insertion

* Fix

* Add mutexes to compute_reps () & foreach_representative ()

* Fix mutex issues

* Fix transactions

* Config version 16 remains

* Update tests

* Expand node_config.v16_values test
This commit is contained in:
Sergey Kroshnin 2019-01-30 22:30:23 +03:00 committed by Zach Hyatt
commit aa3ac699a4
7 changed files with 162 additions and 32 deletions

View file

@ -637,10 +637,12 @@ TEST (node_config, v15_v16_upgrade)
// These config options should not be present at version 15
ASSERT_FALSE (tree.get_optional_child ("allow_local_peers"));
ASSERT_FALSE (tree.get_optional_child ("signature_checker_threads"));
ASSERT_FALSE (tree.get_optional_child ("vote_minimum"));
config.deserialize_json (upgraded, tree);
// The config options should be added after the upgrade
ASSERT_TRUE (!!tree.get_optional_child ("allow_local_peers"));
ASSERT_TRUE (!!tree.get_optional_child ("signature_checker_threads"));
ASSERT_TRUE (!!tree.get_optional_child ("vote_minimum"));
ASSERT_TRUE (upgraded);
auto version (tree.get<std::string> ("version"));
@ -665,7 +667,7 @@ TEST (node_config, v15_v16_upgrade)
test_upgrade ("rai-beta.raiblocks.net", "peering-beta.nano.org");
}
TEST (node_config, allow_local_peers)
TEST (node_config, v16_values)
{
nano::jsonconfig tree;
add_required_children_node_config_tree (tree);
@ -678,19 +680,23 @@ TEST (node_config, allow_local_peers)
// Check config is correct
tree.put ("allow_local_peers", false);
tree.put ("signature_checker_threads", 1);
tree.put ("vote_minimum", nano::Gxrb_ratio.convert_to<std::string> ());
config.deserialize_json (upgraded, tree);
ASSERT_FALSE (upgraded);
ASSERT_FALSE (config.allow_local_peers);
ASSERT_EQ (config.signature_checker_threads, 1);
ASSERT_EQ (config.vote_minimum.number (), nano::Gxrb_ratio);
// Check config is correct with other values
tree.put ("allow_local_peers", true);
tree.put ("signature_checker_threads", 4);
tree.put ("vote_minimum", (std::numeric_limits<nano::uint128_t>::max () - 100).convert_to<std::string> ());
upgraded = false;
config.deserialize_json (upgraded, tree);
ASSERT_FALSE (upgraded);
ASSERT_TRUE (config.allow_local_peers);
ASSERT_EQ (config.signature_checker_threads, 4);
ASSERT_EQ (config.vote_minimum.number (), std::numeric_limits<nano::uint128_t>::max () - 100);
}
// Regression test to ensure that deserializing includes changes node via get_required_child
@ -820,15 +826,21 @@ TEST (node, fork_publish)
auto existing (node1.active.roots.find (nano::uint512_union (send1->previous (), send1->root ())));
ASSERT_NE (node1.active.roots.end (), existing);
auto election (existing->election);
auto transaction (node1.store.tx_begin ());
election->compute_rep_votes (transaction);
node1.vote_processor.flush ();
ASSERT_EQ (2, election->last_votes.size ());
system.deadline_set (1s);
// Wait until the genesis rep activated & makes vote
while (election->last_votes.size () != 2)
{
auto transaction (node1.store.tx_begin ());
election->compute_rep_votes (transaction);
node1.vote_processor.flush ();
ASSERT_NO_ERROR (system.poll ());
}
node1.process_active (send2);
node1.block_processor.flush ();
auto existing1 (election->last_votes.find (nano::test_genesis_key.pub));
ASSERT_NE (election->last_votes.end (), existing1);
ASSERT_EQ (send1->hash (), existing1->second.hash);
auto transaction (node1.store.tx_begin ());
auto winner (*election->tally (transaction).begin ());
ASSERT_EQ (*send1, *winner.second);
ASSERT_EQ (nano::genesis_amount - 100, winner.first);
@ -1376,11 +1388,16 @@ TEST (node, rep_self_vote)
active.start (block0);
auto existing (active.roots.find (nano::uint512_union (block0->previous (), block0->root ())));
ASSERT_NE (active.roots.end (), existing);
auto transaction (node0->store.tx_begin ());
existing->election->compute_rep_votes (transaction);
node0->vote_processor.flush ();
system.deadline_set (1s);
// Wait until representatives are activated & make vote
while (existing->election->last_votes.size () != 3)
{
auto transaction (node0->store.tx_begin ());
existing->election->compute_rep_votes (transaction);
node0->vote_processor.flush ();
ASSERT_NO_ERROR (system.poll ());
}
auto & rep_votes (existing->election->last_votes);
ASSERT_EQ (3, rep_votes.size ());
ASSERT_NE (rep_votes.end (), rep_votes.find (nano::test_genesis_key.pub));
ASSERT_NE (rep_votes.end (), rep_votes.find (rep_big.pub));
}

View file

@ -150,3 +150,31 @@ TEST (wallets, reload)
}
ASSERT_EQ (2, system.nodes[0]->wallets.items.size ());
}
TEST (wallets, vote_minimum)
{
nano::system system (24000, 1);
auto & node1 (*system.nodes[0]);
bool error (false);
nano::wallets wallets (error, node1);
ASSERT_FALSE (error);
nano::keypair key1;
nano::keypair key2;
nano::genesis genesis;
nano::state_block send1 (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, std::numeric_limits<nano::uint128_t>::max () - node1.config.vote_minimum.number (), key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (genesis.hash ()));
ASSERT_EQ (nano::process_result::progress, node1.process (send1).code);
nano::state_block open1 (key1.pub, 0, key1.pub, node1.config.vote_minimum.number (), send1.hash (), key1.prv, key1.pub, system.work.generate (key1.pub));
ASSERT_EQ (nano::process_result::progress, node1.process (open1).code);
// send2 with amount vote_minimum - 1 (not voting representative)
nano::state_block send2 (nano::test_genesis_key.pub, send1.hash (), nano::test_genesis_key.pub, std::numeric_limits<nano::uint128_t>::max () - 2 * node1.config.vote_minimum.number () + 1, key2.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (send1.hash ()));
ASSERT_EQ (nano::process_result::progress, node1.process (send2).code);
nano::state_block open2 (key2.pub, 0, key2.pub, node1.config.vote_minimum.number () - 1, send2.hash (), key2.prv, key2.pub, system.work.generate (key2.pub));
ASSERT_EQ (nano::process_result::progress, node1.process (open2).code);
auto wallet (wallets.items.begin ()->second);
ASSERT_EQ (0, wallet->representatives.size ());
wallet->insert_adhoc (nano::test_genesis_key.prv);
wallet->insert_adhoc (key1.prv);
wallet->insert_adhoc (key2.prv);
wallets.compute_reps ();
ASSERT_EQ (2, wallet->representatives.size ());
}

View file

@ -35,7 +35,8 @@ bootstrap_connections_max (64),
callback_port (0),
lmdb_max_dbs (128),
allow_local_peers (false),
block_processor_batch_max_time (std::chrono::milliseconds (5000))
block_processor_batch_max_time (std::chrono::milliseconds (5000)),
vote_minimum (nano::Gxrb_ratio)
{
const char * epoch_message ("epoch v1 block");
strncpy ((char *)epoch_block_link.bytes.data (), epoch_message, epoch_block_link.bytes.size ());
@ -118,6 +119,7 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const
json.put ("lmdb_max_dbs", lmdb_max_dbs);
json.put ("block_processor_batch_max_time", block_processor_batch_max_time.count ());
json.put ("allow_local_peers", allow_local_peers);
json.put ("vote_minimum", vote_minimum.to_string_dec ());
nano::jsonconfig ipc_l;
ipc_config.serialize_json (ipc_l);
@ -230,6 +232,7 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso
});
json.replace_child (preconfigured_peers_key, peers);
json.put ("vote_minimum", vote_minimum.to_string_dec ());
nano::jsonconfig ipc_l;
ipc_config.serialize_json (ipc_l);
@ -327,6 +330,12 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco
json.get_error ().set ("online_weight_minimum contains an invalid decimal amount");
}
auto vote_minimum_l (json.get<std::string> ("vote_minimum"));
if (vote_minimum.decode_dec (vote_minimum_l))
{
json.get_error ().set ("vote_minimum contains an invalid decimal amount");
}
auto block_processor_batch_max_time_l (json.get<unsigned long> ("block_processor_batch_max_time"));
block_processor_batch_max_time = std::chrono::milliseconds (block_processor_batch_max_time_l);

View file

@ -30,6 +30,7 @@ public:
std::vector<nano::account> preconfigured_representatives;
unsigned bootstrap_fraction_numerator;
nano::amount receive_minimum;
nano::amount vote_minimum;
nano::amount online_weight_minimum;
unsigned online_weight_quorum;
unsigned password_fanout;

View file

@ -800,6 +800,12 @@ nano::public_key nano::wallet::deterministic_insert (nano::transaction const & t
{
work_ensure (key, key);
}
auto block_transaction (wallets.node.store.tx_begin_read ());
if (wallets.node.ledger.weight (block_transaction, key) >= wallets.node.config.vote_minimum.number ())
{
std::lock_guard<std::mutex> lock (representatives_mutex);
representatives.insert (key);
}
}
return key;
}
@ -832,11 +838,16 @@ nano::public_key nano::wallet::insert_adhoc (nano::transaction const & transacti
if (store.valid_password (transaction_a))
{
key = store.insert_adhoc (transaction_a, key_a);
auto block_transaction (wallets.node.store.tx_begin_read ());
if (generate_work_a)
{
auto block_transaction (wallets.node.store.tx_begin_read ());
work_ensure (key, wallets.node.ledger.latest_root (block_transaction, key));
}
if (wallets.node.ledger.weight (block_transaction, key) >= wallets.node.config.vote_minimum.number ())
{
std::lock_guard<std::mutex> lock (representatives_mutex);
representatives.insert (key);
}
}
return key;
}
@ -1315,7 +1326,7 @@ thread ([this]() {
do_wallet_actions ();
})
{
std::lock_guard<std::mutex> lock (mutex);
std::unique_lock<std::mutex> lock (mutex);
if (!error_a)
{
auto transaction (tx_begin_write ());
@ -1349,6 +1360,11 @@ thread ([this]() {
{
item.second->enter_initial_password ();
}
if (node_a.config.enable_voting)
{
lock.unlock ();
ongoing_compute_reps ();
}
}
nano::wallets::~wallets ()
@ -1501,31 +1517,38 @@ void nano::wallets::queue_wallet_action (nano::uint128_t const & amount_a, std::
void nano::wallets::foreach_representative (nano::transaction const & transaction_a, std::function<void(nano::public_key const & pub_a, nano::raw_key const & prv_a)> const & action_a)
{
std::lock_guard<std::mutex> lock (mutex);
auto transaction_l (node.wallets.tx_begin_read ());
for (auto i (items.begin ()), n (items.end ()); i != n; ++i)
if (node.config.enable_voting)
{
auto & wallet (*i->second);
std::lock_guard<std::recursive_mutex> store_lock (wallet.store.mutex);
for (auto j (wallet.store.begin (transaction_l)), m (wallet.store.end ()); j != m; ++j)
std::lock_guard<std::mutex> lock (mutex);
auto transaction_l (tx_begin_read ());
for (auto i (items.begin ()), n (items.end ()); i != n; ++i)
{
nano::account account (j->first);
if (!node.ledger.weight (transaction_a, account).is_zero ())
auto & wallet (*i->second);
std::lock_guard<std::recursive_mutex> store_lock (wallet.store.mutex);
std::lock_guard<std::mutex> representatives_lock (wallet.representatives_mutex);
for (auto ii (wallet.representatives.begin ()), nn (wallet.representatives.end ()); ii != nn; ++ii)
{
if (wallet.store.valid_password (transaction_l))
nano::account account (*ii);
if (wallet.store.exists (transaction_l, account))
{
nano::raw_key prv;
auto error (wallet.store.fetch (transaction_l, nano::uint256_union (j->first), prv));
assert (!error);
action_a (nano::uint256_union (j->first), prv);
}
else
{
static auto last_log = std::chrono::steady_clock::time_point ();
if (last_log < std::chrono::steady_clock::now () - std::chrono::seconds (60))
if (!node.ledger.weight (transaction_a, account).is_zero ())
{
last_log = std::chrono::steady_clock::now ();
BOOST_LOG (node.log) << boost::str (boost::format ("Representative locked inside wallet %1%") % i->first.to_string ());
if (wallet.store.valid_password (transaction_l))
{
nano::raw_key prv;
auto error (wallet.store.fetch (transaction_l, account, prv));
assert (!error);
action_a (account, prv);
}
else
{
static auto last_log = std::chrono::steady_clock::time_point ();
if (last_log < std::chrono::steady_clock::now () - std::chrono::seconds (60))
{
last_log = std::chrono::steady_clock::now ();
BOOST_LOG (node.log) << boost::str (boost::format ("Representative locked inside wallet %1%") % i->first.to_string ());
}
}
}
}
}
@ -1579,6 +1602,38 @@ void nano::wallets::clear_send_ids (nano::transaction const & transaction_a)
assert (status == 0);
}
void nano::wallets::compute_reps ()
{
std::lock_guard<std::mutex> lock (mutex);
auto ledger_transaction (node.store.tx_begin_read ());
auto transaction (tx_begin_read ());
for (auto i (items.begin ()), n (items.end ()); i != n; ++i)
{
auto & wallet (*i->second);
decltype (wallet.representatives) representatives_l;
for (auto ii (wallet.store.begin (transaction)), nn (wallet.store.end ()); ii != nn; ++ii)
{
auto account (ii->first);
if (node.ledger.weight (ledger_transaction, account) >= node.config.vote_minimum.number ())
{
representatives_l.insert (account);
}
}
std::lock_guard<std::mutex> representatives_lock (wallet.representatives_mutex);
wallet.representatives.swap (representatives_l);
}
}
void nano::wallets::ongoing_compute_reps ()
{
compute_reps ();
auto & node_l (node);
auto compute_delay (nano::nano_network == nano::nano_networks::nano_test_network ? std::chrono::milliseconds (10) : std::chrono::milliseconds (15 * 60 * 1000)); // Representation drifts quickly on the test network but very slowly on the live network
node.alarm.add (std::chrono::steady_clock::now () + compute_delay, [&node_l]() {
node_l.wallets.ongoing_compute_reps ();
});
}
void nano::wallets::split_if_needed (nano::transaction & transaction_destination, nano::block_store & store_a)
{
auto store_l (dynamic_cast<nano::mdb_store *> (&store_a));

View file

@ -156,6 +156,8 @@ public:
std::function<void(bool, bool)> lock_observer;
nano::wallet_store store;
nano::wallets & wallets;
std::mutex representatives_mutex;
std::unordered_set<nano::account> representatives;
};
class node;
@ -180,6 +182,8 @@ public:
bool exists (nano::transaction const &, nano::public_key const &);
void stop ();
void clear_send_ids (nano::transaction const &);
void compute_reps ();
void ongoing_compute_reps ();
void split_if_needed (nano::transaction &, nano::block_store &);
void move_table (std::string const &, MDB_txn *, MDB_txn *);
std::function<void(bool)> observer;

View file

@ -406,6 +406,22 @@ TEST (store, vote_load)
}
}
TEST (wallets, rep_scan)
{
nano::system system (24000, 1);
auto & node (*system.nodes[0]);
auto wallet (system.wallet (0));
auto transaction (node.wallets.tx_begin_write ());
for (auto i (0); i < 10000; ++i)
{
wallet->deterministic_insert (transaction);
}
auto begin (std::chrono::steady_clock::now ());
node.wallets.foreach_representative (transaction, [](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
});
ASSERT_LT (std::chrono::steady_clock::now () - begin, std::chrono::milliseconds (5));
}
TEST (node, mass_vote_by_hash)
{
nano::system system (24000, 1);