Local votes history to replace votes cache (#2827)

* Adding local_vote_history class

* Replace votes_cache with new vote history

* Stop preemptively erasing votes from history

Instead, let the new votes replace the existing

* Add test suite for vote generation and caching

* Use correct size for votes cache and double capacity to 128k votes

* Unecessary qualification in header + small bug fix

* Revert "Stop preemptively erasing votes from history"

This reverts commit f852fd8d891536560c8a5c28dcf5e2a488c060ab.
This commit is contained in:
Guilherme Lawless 2020-07-27 17:33:17 +01:00 committed by GitHub
commit a80e7c915d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 346 additions and 249 deletions

View file

@ -36,6 +36,7 @@ add_executable (core_test
uint256_union.cpp
utility.cpp
vote_processor.cpp
voting.cpp
wallet.cpp
wallets.cpp
websocket.cpp

View file

@ -1168,28 +1168,6 @@ TEST (active_multiplier, normalization)
ASSERT_NEAR (nano::denormalized_multiplier (norm_multiplier15, node.network_params.network.publish_thresholds.epoch_2_receive), multiplier15, 1e-10);
}
namespace nano
{
TEST (active_transactions, vote_generator_session)
{
nano::system system (1);
auto node (system.nodes[0]);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
nano::vote_generator_session generator_session (node->active.generator);
boost::thread thread ([node, &generator_session]() {
nano::thread_role::set (nano::thread_role::name::request_loop);
for (unsigned i = 0; i < 100; ++i)
{
generator_session.add (nano::genesis_hash);
}
ASSERT_EQ (0, node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate));
generator_session.flush ();
});
thread.join ();
ASSERT_TIMELY (5s, node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate) == (100 / nano::network::confirm_ack_hashes_max));
}
}
TEST (active_transactions, election_difficulty_update_old)
{
nano::system system;

View file

@ -2714,9 +2714,9 @@ TEST (node, local_votes_cache)
ASSERT_NO_ERROR (system.poll (node.aggregator.max_delay));
}
wait_vote_sequence (3);
ASSERT_TIMELY (3s, node.votes_cache.find (send1->hash ()).empty ());
ASSERT_FALSE (node.votes_cache.find (send2->hash ()).empty ());
ASSERT_FALSE (node.votes_cache.find (send3->hash ()).empty ());
ASSERT_FALSE (node.history.votes (send1->root (), send1->hash ()).empty ());
ASSERT_FALSE (node.history.votes (send2->root (), send2->hash ()).empty ());
ASSERT_FALSE (node.history.votes (send3->root (), send3->hash ()).empty ());
}
TEST (node, local_votes_cache_batch)
@ -2737,7 +2737,6 @@ TEST (node, local_votes_cache_batch)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*node.work_generate_blocking (genesis.hash ()))
.build_shared ();
std::vector<std::shared_ptr<nano::block>> blocks{ genesis.open, send1 };
std::vector<std::pair<nano::block_hash, nano::root>> batch{ { genesis.open->hash (), genesis.open->root () }, { send1->hash (), send1->root () } };
{
auto transaction (node.store.tx_begin_write ());
@ -2749,15 +2748,15 @@ TEST (node, local_votes_cache_batch)
node.network.process_message (message, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 1);
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_FALSE (node.votes_cache.find (genesis.open->hash ()).empty ());
ASSERT_FALSE (node.votes_cache.find (send1->hash ()).empty ());
ASSERT_FALSE (node.history.votes (genesis.open->root (), genesis.open->hash ()).empty ());
ASSERT_FALSE (node.history.votes (send1->root (), send1->hash ()).empty ());
// Only one confirm_ack should be sent if all hashes are part of the same vote
node.network.process_message (message, channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2);
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Test when votes are different
node.votes_cache.remove (genesis.open->hash ());
node.votes_cache.remove (send1->hash ());
node.history.erase (genesis.open->root ());
node.history.erase (send1->root ());
node.network.process_message (nano::confirm_req (genesis.open->hash (), genesis.open->root ()), channel);
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 3);
ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
@ -2782,8 +2781,8 @@ TEST (node, local_votes_cache_generate_new_vote)
nano::confirm_req message1 (genesis.open);
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.network.process_message (message1, channel);
ASSERT_TIMELY (3s, !node.votes_cache.find (genesis.open->hash ()).empty ());
auto votes1 (node.votes_cache.find (genesis.open->hash ()));
ASSERT_TIMELY (3s, !node.history.votes (genesis.open->root (), genesis.open->hash ()).empty ());
auto votes1 (node.history.votes (genesis.open->root (), genesis.open->hash ()));
ASSERT_EQ (1, votes1.size ());
ASSERT_EQ (1, votes1[0]->blocks.size ());
ASSERT_EQ (genesis.open->hash (), boost::get<nano::block_hash> (votes1[0]->blocks[0]));
@ -2808,8 +2807,8 @@ TEST (node, local_votes_cache_generate_new_vote)
std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes{ std::make_pair (genesis.open->hash (), genesis.open->root ()), std::make_pair (send1->hash (), send1->root ()) };
nano::confirm_req message2 (roots_hashes);
node.network.process_message (message2, channel);
ASSERT_TIMELY (3s, !node.votes_cache.find (send1->hash ()).empty ());
auto votes2 (node.votes_cache.find (send1->hash ()));
ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ());
auto votes2 (node.history.votes (send1->root (), send1->hash ()));
ASSERT_EQ (1, votes2.size ());
ASSERT_EQ (1, votes2[0]->blocks.size ());
{
@ -2819,38 +2818,12 @@ TEST (node, local_votes_cache_generate_new_vote)
ASSERT_EQ (current_vote->sequence, 2);
ASSERT_EQ (current_vote, votes2[0]);
}
ASSERT_FALSE (node.votes_cache.find (genesis.open->hash ()).empty ());
ASSERT_FALSE (node.votes_cache.find (send1->hash ()).empty ());
ASSERT_FALSE (node.history.votes (genesis.open->root (), genesis.open->hash ()).empty ());
ASSERT_FALSE (node.history.votes (send1->root (), send1->hash ()).empty ());
// First generated + again cached + new generated
ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
// Tests that the max cache size is inversely proportional to the number of voting accounts
TEST (node, local_votes_cache_size)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
node_config.vote_minimum = 0; // wallet will pick up the second account as voting even if unopened
auto & node (*system.add_node (node_config));
ASSERT_EQ (node.network_params.voting.max_cache, 2); // effective cache size is 1 with 2 voting accounts
nano::keypair key;
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
wallet.insert_adhoc (nano::keypair ().prv);
ASSERT_EQ (2, node.wallets.reps ().voting);
auto transaction (node.store.tx_begin_read ());
auto vote1 (node.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, { nano::genesis_hash }));
nano::block_hash hash (1);
auto vote2 (node.store.vote_generate (transaction, nano::test_genesis_key.pub, nano::test_genesis_key.prv, { hash }));
node.votes_cache.add (vote1);
node.votes_cache.add (vote2);
auto existing2 (node.votes_cache.find (hash));
ASSERT_EQ (1, existing2.size ());
ASSERT_EQ (vote2, existing2.front ());
ASSERT_EQ (0, node.votes_cache.find (nano::genesis_hash).size ());
}
TEST (node, vote_republish)
{
nano::system system (2);
@ -2889,8 +2862,6 @@ TEST (node, vote_republish)
ASSERT_TIMELY (10s, node1.balance (key2.pub) == node1.config.receive_minimum.number () * 2);
}
namespace nano
{
TEST (node, vote_by_hash_bundle)
{
// Keep max_hashes above system to ensure it is kept in scope as votes can be added during system destruction
@ -2909,14 +2880,13 @@ TEST (node, vote_by_hash_bundle)
for (int i = 1; i <= 200; i++)
{
system.nodes[0]->active.generator.add (nano::genesis_hash);
system.nodes[0]->active.generator.add (nano::genesis_account, nano::genesis_hash);
}
// Verify that bundling occurs. While reaching 12 should be common on most hardware in release mode,
// we set this low enough to allow the test to pass on CI/with santitizers.
ASSERT_TIMELY (20s, max_hashes.load () >= 3);
}
}
TEST (node, vote_by_hash_republish)
{
@ -4002,13 +3972,13 @@ TEST (node, rollback_vote_self)
ASSERT_EQ (election->status.winner, fork);
}
// Even without the rollback being finished, the aggregator must reply with a vote for the new winner, not the old one
ASSERT_TRUE (node.votes_cache.find (send2->hash ()).empty ());
ASSERT_TRUE (node.votes_cache.find (fork->hash ()).empty ());
ASSERT_TRUE (node.history.votes (send2->root (), send2->hash ()).empty ());
ASSERT_TRUE (node.history.votes (fork->root (), fork->hash ()).empty ());
auto & node2 = *system.add_node ();
auto channel (node.network.udp_channels.create (node2.network.endpoint ()));
node.aggregator.add (channel, { { send2->hash (), send2->root () } });
ASSERT_TIMELY (5s, !node.votes_cache.find (fork->hash ()).empty ());
ASSERT_TRUE (node.votes_cache.find (send2->hash ()).empty ());
ASSERT_TIMELY (5s, !node.history.votes (fork->root (), fork->hash ()).empty ());
ASSERT_TRUE (node.history.votes (send2->root (), send2->hash ()).empty ());
// Going out of the scope allows the rollback to complete
}

View file

@ -110,8 +110,8 @@ TEST (request_aggregator, two)
ASSERT_TIMELY (3s, 0 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote));
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Make sure the cached vote is for both hashes
auto vote2 (node.votes_cache.find (genesis.hash ()));
auto vote1 (node.votes_cache.find (send1->hash ()));
auto vote2 (node.history.votes (genesis.open->root (), genesis.hash ()));
auto vote1 (node.history.votes (send1->root (), send1->hash ()));
ASSERT_EQ (1, vote1.size ());
ASSERT_EQ (1, vote2.size ());
ASSERT_EQ (vote1.front (), vote2.front ());

115
nano/core_test/voting.cpp Normal file
View file

@ -0,0 +1,115 @@
#include <nano/node/common.hpp>
#include <nano/node/testing.hpp>
#include <nano/node/voting.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
using namespace std::chrono_literals;
namespace nano
{
TEST (local_vote_history, basic)
{
nano::local_vote_history history;
ASSERT_FALSE (history.exists (1));
ASSERT_FALSE (history.exists (2));
ASSERT_TRUE (history.votes (1).empty ());
ASSERT_TRUE (history.votes (2).empty ());
auto vote1 (std::make_shared<nano::vote> ());
ASSERT_EQ (0, history.size ());
history.add (1, 2, vote1);
ASSERT_EQ (1, history.size ());
ASSERT_TRUE (history.exists (1));
ASSERT_FALSE (history.exists (2));
auto votes1 (history.votes (1));
ASSERT_FALSE (votes1.empty ());
ASSERT_EQ (1, history.votes (1, 2).size ());
ASSERT_TRUE (history.votes (1, 1).empty ());
ASSERT_TRUE (history.votes (1, 3).empty ());
ASSERT_TRUE (history.votes (2).empty ());
ASSERT_EQ (1, votes1.size ());
ASSERT_EQ (vote1, votes1[0]);
auto vote2 (std::make_shared<nano::vote> ());
ASSERT_EQ (1, history.size ());
history.add (1, 2, vote2);
ASSERT_EQ (2, history.size ());
auto votes2 (history.votes (1));
ASSERT_EQ (2, votes2.size ());
ASSERT_TRUE (vote1 == votes2[0] || vote1 == votes2[1]);
ASSERT_TRUE (vote2 == votes2[0] || vote2 == votes2[1]);
auto vote3 (std::make_shared<nano::vote> ());
history.add (1, 3, vote3);
ASSERT_EQ (1, history.size ());
auto votes3 (history.votes (1));
ASSERT_EQ (1, votes3.size ());
ASSERT_TRUE (vote3 == votes3[0]);
}
}
TEST (vote_generator, cache)
{
nano::system system (1);
auto & node (*system.nodes[0]);
auto epoch1 = system.upgrade_genesis_epoch (node, nano::epoch::epoch_1);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
node.active.generator.add (epoch1->root (), epoch1->hash ());
ASSERT_TIMELY (1s, !node.history.votes (epoch1->root (), epoch1->hash ()).empty ());
auto votes (node.history.votes (epoch1->root (), epoch1->hash ()));
ASSERT_FALSE (votes.empty ());
ASSERT_TRUE (std::any_of (votes[0]->begin (), votes[0]->end (), [hash = epoch1->hash ()](nano::block_hash const & hash_a) { return hash_a == hash; }));
}
TEST (vote_generator, multiple_representatives)
{
nano::system system (1);
auto & node (*system.nodes[0]);
nano::keypair key1, key2, key3;
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
wallet.insert_adhoc (key1.prv);
wallet.insert_adhoc (key2.prv);
wallet.insert_adhoc (key3.prv);
auto const amount = 100 * nano::Gxrb_ratio;
wallet.send_sync (nano::test_genesis_key.pub, key1.pub, amount);
wallet.send_sync (nano::test_genesis_key.pub, key2.pub, amount);
wallet.send_sync (nano::test_genesis_key.pub, key3.pub, amount);
ASSERT_TIMELY (3s, node.balance (key1.pub) == amount && node.balance (key2.pub) == amount && node.balance (key3.pub) == amount);
wallet.change_sync (key1.pub, key1.pub);
wallet.change_sync (key2.pub, key2.pub);
wallet.change_sync (key3.pub, key3.pub);
ASSERT_TRUE (node.weight (key1.pub) == amount && node.weight (key2.pub) == amount && node.weight (key3.pub) == amount);
node.wallets.compute_reps ();
ASSERT_EQ (4, node.wallets.reps ().voting);
auto hash = wallet.send_sync (nano::test_genesis_key.pub, nano::test_genesis_key.pub, 1);
auto send = node.block (hash);
ASSERT_NE (nullptr, send);
ASSERT_TIMELY (5s, node.history.votes (send->root (), send->hash ()).size () == 4);
auto votes (node.history.votes (send->root (), send->hash ()));
for (auto const & account : { key1.pub, key2.pub, key3.pub, nano::test_genesis_key.pub })
{
auto existing (std::find_if (votes.begin (), votes.end (), [&account](std::shared_ptr<nano::vote> const & vote_a) -> bool {
return vote_a->account == account;
}));
ASSERT_NE (votes.end (), existing);
}
}
TEST (vote_generator, session)
{
nano::system system (1);
auto node (system.nodes[0]);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
nano::vote_generator_session generator_session (node->active.generator);
boost::thread thread ([node, &generator_session]() {
nano::thread_role::set (nano::thread_role::name::request_loop);
for (unsigned i = 0; i < 100; ++i)
{
generator_session.add (nano::genesis_account, nano::genesis_hash);
}
ASSERT_EQ (0, node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate));
generator_session.flush ();
});
thread.join ();
ASSERT_TIMELY (5s, node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate) == (100 / nano::network::confirm_ack_hashes_max));
}

View file

@ -20,7 +20,7 @@ confirmation_height_processor (confirmation_height_processor_a),
node (node_a),
multipliers_cb (20, 1.),
trended_active_multiplier (1.0),
generator (node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.votes_cache, node_a.network),
generator (node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network),
check_all_elections_period (node_a.network_params.network.is_test_network () ? 10ms : 5s),
election_time_to_live (node_a.network_params.network.is_test_network () ? 0s : 2s),
prioritized_cutoff (std::max<size_t> (1, node_a.config.active_elections_size / 10)),

View file

@ -193,10 +193,11 @@ public:
void add_election_winner_details (nano::block_hash const &, std::shared_ptr<nano::election> const &);
void remove_election_winner_details (nano::block_hash const &);
nano::vote_generator generator;
private:
std::mutex election_winner_details_mutex;
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> election_winner_details;
nano::vote_generator generator;
// Call action with confirmed block, may be different than what we started with
// clang-format off
@ -276,8 +277,6 @@ private:
friend class confirmation_height_prioritize_frontiers_Test;
friend class confirmation_height_prioritize_frontiers_overwrite_Test;
friend class active_transactions_confirmation_consistency_Test;
friend class active_transactions_vote_generator_session_Test;
friend class node_vote_by_hash_bundle_Test;
friend class node_deferred_dependent_elections_Test;
friend class election_bisect_dependencies_Test;
friend class election_dependencies_open_link_Test;

View file

@ -263,7 +263,7 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
// Deleting from votes cache & wallet work watcher, stop active transaction
for (auto & i : rollback_list)
{
node.votes_cache.remove (i->hash ());
node.history.erase (i->root ());
node.wallets.watcher->remove (*i);
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())

View file

@ -28,7 +28,8 @@ confirmation_action (confirmation_action_a),
prioritized_m (prioritized_a),
node (node_a),
status ({ block_a, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), 0, 1, 0, nano::election_status_type::ongoing }),
height (block_a->sideband ().height)
height (block_a->sideband ().height),
root (block_a->root ())
{
last_votes.emplace (node.network_params.random.not_an_account, nano::vote_info{ std::chrono::steady_clock::now (), 0, block_a->hash () });
blocks.emplace (block_a->hash (), block_a);
@ -355,7 +356,7 @@ void nano::election::log_votes (nano::tally_t const & tally_a) const
{
std::stringstream tally;
std::string line_end (node.config.logging.single_line_record () ? "\t" : "\n");
tally << boost::str (boost::format ("%1%Vote tally for root %2%") % line_end % status.winner->root ().to_string ());
tally << boost::str (boost::format ("%1%Vote tally for root %2%") % line_end % root.to_string ());
for (auto i (tally_a.begin ()), n (tally_a.end ()); i != n; ++i)
{
tally << boost::str (boost::format ("%1%Block %2% weight %3%") % line_end % i->second->hash ().to_string () % i->first.convert_to<std::string> ());
@ -575,7 +576,7 @@ void nano::election::prioritize_election (nano::vote_generator_session & generat
debug_assert (!node.active.mutex.try_lock ());
debug_assert (!prioritized_m);
prioritized_m = true;
generator_session_a.add (status.winner->hash ());
generator_session_a.add (root, status.winner->hash ());
}
void nano::election::try_generate_votes (nano::block_hash const & hash_a)
@ -592,7 +593,7 @@ void nano::election::generate_votes (nano::block_hash const & hash_a)
{
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
node.active.generator.add (hash_a);
node.active.generator.add (root, hash_a);
}
}
@ -601,12 +602,12 @@ void nano::election::remove_votes (nano::block_hash const & hash_a)
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
// Remove votes from election
auto list_generated_votes (node.votes_cache.find (hash_a));
auto list_generated_votes (node.history.votes (root, hash_a));
for (auto const & vote : list_generated_votes)
{
last_votes.erase (vote->account);
}
// Clear votes cache
node.votes_cache.remove (hash_a);
node.history.erase (root);
}
}

View file

@ -114,6 +114,7 @@ public:
std::unordered_set<nano::block_hash> dependent_blocks;
std::chrono::seconds late_blocks_delay{ 5 };
uint64_t const height;
nano::root const root;
friend class active_transactions;

View file

@ -121,11 +121,10 @@ block_processor_thread ([this]() {
}),
// clang-format on
online_reps (ledger, network_params, config.online_weight_minimum.number ()),
votes_cache (wallets),
vote_uniquer (block_uniquer),
confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger, node_initialized_latch, flags.confirmation_height_processor_mode),
active (*this, confirmation_height_processor),
aggregator (network_params.network, config, stats, votes_cache, ledger, wallets, active),
aggregator (network_params.network, config, stats, history, ledger, wallets, active),
payment_observer_processor (observers.blocks),
wallets (wallets_store.init_error (), *this),
startup_time (std::chrono::steady_clock::now ()),
@ -582,7 +581,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.block_processor, "block_processor"));
composite->add_component (collect_container_info (node.block_arrival, "block_arrival"));
composite->add_component (collect_container_info (node.online_reps, "online_reps"));
composite->add_component (collect_container_info (node.votes_cache, "votes_cache"));
composite->add_component (collect_container_info (node.history, "history"));
composite->add_component (collect_container_info (node.block_uniquer, "block_uniquer"));
composite->add_component (collect_container_info (node.vote_uniquer, "vote_uniquer"));
composite->add_component (collect_container_info (node.confirmation_height_processor, "confirmation_height_processor"));

View file

@ -182,7 +182,7 @@ public:
std::thread block_processor_thread;
nano::block_arrival block_arrival;
nano::online_reps online_reps;
nano::votes_cache votes_cache;
nano::local_vote_history history;
nano::keypair node_id;
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer;

View file

@ -11,12 +11,12 @@
#include <nano/secure/blockstore.hpp>
#include <nano/secure/ledger.hpp>
nano::request_aggregator::request_aggregator (nano::network_constants const & network_constants_a, nano::node_config const & config_a, nano::stat & stats_a, nano::votes_cache & cache_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::active_transactions & active_a) :
nano::request_aggregator::request_aggregator (nano::network_constants const & network_constants_a, nano::node_config const & config_a, nano::stat & stats_a, nano::local_vote_history & history_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::active_transactions & active_a) :
max_delay (network_constants_a.is_test_network () ? 50 : 300),
small_delay (network_constants_a.is_test_network () ? 10 : 50),
max_channel_requests (config_a.max_queued_requests),
stats (stats_a),
votes_cache (cache_a),
local_votes (history_a),
ledger (ledger_a),
wallets (wallets_a),
active (active_a),
@ -145,15 +145,15 @@ void nano::request_aggregator::erase_duplicates (std::vector<std::pair<nano::blo
requests_a.end ());
}
std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const
std::vector<std::pair<nano::root, nano::block_hash>> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
size_t cached_hashes = 0;
std::vector<nano::block_hash> to_generate;
std::vector<std::pair<nano::root, nano::block_hash>> to_generate;
std::vector<std::shared_ptr<nano::vote>> cached_votes;
for (auto const & hash_root : requests_a)
{
// 1. Votes in cache
auto find_votes (votes_cache.find (hash_root.first));
auto find_votes (local_votes.votes (hash_root.second, hash_root.first));
if (!find_votes.empty ())
{
++cached_hashes;
@ -190,7 +190,7 @@ std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transac
auto successor_block = ledger.store.block_get (transaction_a, successor);
debug_assert (successor_block != nullptr);
// 5. Votes in cache for successor
auto find_successor_votes (votes_cache.find (successor));
auto find_successor_votes (local_votes.votes (hash_root.second, successor));
if (!find_successor_votes.empty ())
{
cached_votes.insert (cached_votes.end (), find_successor_votes.begin (), find_successor_votes.end ());
@ -207,7 +207,7 @@ std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transac
// Attempt to vote for this block
if (ledger.can_vote (transaction_a, *block))
{
to_generate.push_back (block->hash ());
to_generate.emplace_back (block->root (), block->hash ());
}
else
{
@ -239,7 +239,7 @@ std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transac
return to_generate;
}
void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<nano::block_hash> const & hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<std::pair<nano::root, nano::block_hash>> const & hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
size_t generated_l = 0;
auto i (hashes_a.begin ());
@ -247,16 +247,23 @@ void nano::request_aggregator::generate (nano::transaction const & transaction_a
while (i != n)
{
std::vector<nano::block_hash> hashes_l;
std::vector<nano::root> roots;
hashes_l.reserve (nano::network::confirm_ack_hashes_max);
roots.reserve (nano::network::confirm_ack_hashes_max);
for (; i != n && hashes_l.size () < nano::network::confirm_ack_hashes_max; ++i)
{
hashes_l.push_back (*i);
roots.push_back (i->first);
hashes_l.push_back (i->second);
}
wallets.foreach_representative ([this, &generated_l, &hashes_l, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
wallets.foreach_representative ([this, &generated_l, &hashes_l, &roots, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
auto vote (this->ledger.store.vote_generate (transaction_a, pub_a, prv_a, hashes_l));
++generated_l;
nano::confirm_ack confirm (vote);
channel_a->send (confirm);
this->votes_cache.add (vote);
for (size_t i (0), n (hashes_l.size ()); i != n; ++i)
{
this->local_votes.add (roots[i], hashes_l[i], vote);
}
});
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes_a.size ());

View file

@ -19,9 +19,9 @@ namespace nano
{
class active_transactions;
class ledger;
class local_vote_history;
class node_config;
class stat;
class votes_cache;
class wallets;
/**
* Pools together confirmation requests, separately for each endpoint.
@ -59,7 +59,7 @@ class request_aggregator final
public:
request_aggregator () = delete;
request_aggregator (nano::network_constants const &, nano::node_config const & config, nano::stat & stats_a, nano::votes_cache &, nano::ledger &, nano::wallets &, nano::active_transactions &);
request_aggregator (nano::network_constants const &, nano::node_config const & config, nano::stat & stats_a, nano::local_vote_history &, nano::ledger &, nano::wallets &, nano::active_transactions &);
/** Add a new request by \p channel_a for hashes \p hashes_roots_a */
void add (std::shared_ptr<nano::transport::channel> & channel_a, std::vector<std::pair<nano::block_hash, nano::root>> const & hashes_roots_a);
@ -77,12 +77,12 @@ private:
/** Remove duplicate requests **/
void erase_duplicates (std::vector<std::pair<nano::block_hash, nano::root>> &) const;
/** Aggregate \p requests_a and send cached votes to \p channel_a . Return the remaining hashes that need vote generation **/
std::vector<nano::block_hash> aggregate (nano::transaction const &, std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
std::vector<std::pair<nano::root, nano::block_hash>> aggregate (nano::transaction const &, std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
/** Generate votes from \p hashes_a and send to \p channel_a **/
void generate (nano::transaction const &, std::vector<nano::block_hash> const & hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
void generate (nano::transaction const &, std::vector<std::pair<nano::root, nano::block_hash>> const & hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
nano::stat & stats;
nano::votes_cache & votes_cache;
nano::local_vote_history & local_votes;
nano::ledger & ledger;
nano::wallets & wallets;
nano::active_transactions & active;

View file

@ -13,12 +13,97 @@
#include <chrono>
nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::votes_cache & votes_cache_a, nano::network & network_a) :
void nano::local_vote_history::add (nano::root const & root_a, nano::block_hash const & hash_a, std::shared_ptr<nano::vote> const & vote_a)
{
nano::lock_guard<std::mutex> guard (mutex);
clean ();
auto & history_by_root (history.get<tag_root> ());
// Erase any vote that is not for this hash
auto range (history_by_root.equal_range (root_a));
for (auto i (range.first); i != range.second;)
{
if (i->hash != hash_a)
{
i = history_by_root.erase (i);
}
else
{
++i;
}
}
auto result (history_by_root.emplace (root_a, hash_a, vote_a));
debug_assert (result.second);
debug_assert (std::all_of (history_by_root.equal_range (root_a).first, history_by_root.equal_range (root_a).second, [&hash_a](local_vote const & item_a) -> bool { return item_a.vote != nullptr && item_a.hash == hash_a; }));
}
void nano::local_vote_history::erase (nano::root const & root_a)
{
nano::lock_guard<std::mutex> guard (mutex);
auto & history_by_root (history.get<tag_root> ());
auto range (history_by_root.equal_range (root_a));
history_by_root.erase (range.first, range.second);
}
std::vector<std::shared_ptr<nano::vote>> nano::local_vote_history::votes (nano::root const & root_a) const
{
nano::lock_guard<std::mutex> guard (mutex);
std::vector<std::shared_ptr<nano::vote>> result;
auto range (history.get<tag_root> ().equal_range (root_a));
std::transform (range.first, range.second, std::back_inserter (result), [](auto const & entry) { return entry.vote; });
return result;
}
std::vector<std::shared_ptr<nano::vote>> nano::local_vote_history::votes (nano::root const & root_a, nano::block_hash const & hash_a) const
{
nano::lock_guard<std::mutex> guard (mutex);
std::vector<std::shared_ptr<nano::vote>> result;
auto range (history.get<tag_root> ().equal_range (root_a));
// clang-format off
nano::transform_if (range.first, range.second, std::back_inserter (result),
[&hash_a](auto const & entry) { return entry.hash == hash_a; },
[](auto const & entry) { return entry.vote; });
// clang-format on
return result;
}
bool nano::local_vote_history::exists (nano::root const & root_a) const
{
nano::lock_guard<std::mutex> guard (mutex);
return history.get<tag_root> ().find (root_a) != history.get<tag_root> ().end ();
}
void nano::local_vote_history::clean ()
{
debug_assert (max_size > 0);
auto & history_by_sequence (history.get<tag_sequence> ());
while (history_by_sequence.size () > max_size)
{
history_by_sequence.erase (history_by_sequence.begin ());
}
}
size_t nano::local_vote_history::size () const
{
nano::lock_guard<std::mutex> guard (mutex);
return history.size ();
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::local_vote_history & history, const std::string & name)
{
size_t history_count = history.size ();
auto sizeof_element = sizeof (decltype (history.history)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
/* This does not currently loop over each element inside the cache to get the sizes of the votes inside history*/
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "history", history_count, sizeof_element }));
return composite;
}
nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a) :
config (config_a),
ledger (ledger_a),
wallets (wallets_a),
vote_processor (vote_processor_a),
votes_cache (votes_cache_a),
history (history_a),
network (network_a),
thread ([this]() { run (); })
{
@ -26,14 +111,14 @@ thread ([this]() { run (); })
condition.wait (lock, [& started = started] { return started; });
}
void nano::vote_generator::add (nano::block_hash const & hash_a)
void nano::vote_generator::add (nano::root const & root_a, nano::block_hash const & hash_a)
{
auto transaction (ledger.store.tx_begin_read ());
nano::unique_lock<std::mutex> lock (mutex);
auto block (ledger.store.block_get (transaction, hash_a));
if (block != nullptr && ledger.can_vote (transaction, *block))
{
hashes.push_back (hash_a);
nano::unique_lock<std::mutex> lock (mutex);
hashes.emplace_back (root_a, hash_a);
if (hashes.size () >= nano::network::confirm_ack_hashes_max)
{
lock.unlock ();
@ -59,18 +144,25 @@ void nano::vote_generator::stop ()
void nano::vote_generator::send (nano::unique_lock<std::mutex> & lock_a)
{
std::vector<nano::block_hash> hashes_l;
std::vector<nano::root> roots;
hashes_l.reserve (nano::network::confirm_ack_hashes_max);
roots.reserve (nano::network::confirm_ack_hashes_max);
while (!hashes.empty () && hashes_l.size () < nano::network::confirm_ack_hashes_max)
{
hashes_l.push_back (hashes.front ());
auto front (hashes.front ());
hashes.pop_front ();
roots.push_back (front.first);
hashes_l.push_back (front.second);
}
lock_a.unlock ();
{
auto transaction (ledger.store.tx_begin_read ());
wallets.foreach_representative ([this, &hashes_l, &transaction](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
wallets.foreach_representative ([this, &hashes_l, &roots, &transaction](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
auto vote (this->ledger.store.vote_generate (transaction, pub_a, prv_a, hashes_l));
this->votes_cache.add (vote);
for (size_t i (0), n (hashes_l.size ()); i != n; ++i)
{
this->history.add (roots[i], hashes_l[i], vote);
}
this->network.flood_vote_pr (vote);
this->network.flood_vote (vote, 2.0f);
this->vote_processor.vote (vote, std::make_shared<nano::transport::channel_udp> (this->network.udp_channels, this->network.endpoint (), this->network_params.protocol.protocol_version));
@ -113,10 +205,10 @@ generator (vote_generator_a)
{
}
void nano::vote_generator_session::add (nano::block_hash const & hash_a)
void nano::vote_generator_session::add (nano::root const & root_a, nano::block_hash const & hash_a)
{
debug_assert (nano::thread_role::get () == nano::thread_role::name::request_loop);
hashes.push_back (hash_a);
hashes.emplace_back (root_a, hash_a);
}
void nano::vote_generator_session::flush ()
@ -124,107 +216,19 @@ void nano::vote_generator_session::flush ()
debug_assert (nano::thread_role::get () == nano::thread_role::name::request_loop);
for (auto const & i : hashes)
{
generator.add (i);
generator.add (i.first, i.second);
}
}
nano::votes_cache::votes_cache (nano::wallets & wallets_a) :
wallets (wallets_a)
{
}
void nano::votes_cache::add (std::shared_ptr<nano::vote> const & vote_a)
{
auto voting (wallets.reps ().voting);
if (voting == 0)
{
return;
}
nano::lock_guard<std::mutex> lock (cache_mutex);
auto const max_cache_size (network_params.voting.max_cache / std::max (voting, static_cast<decltype (voting)> (1)));
for (auto & block : vote_a->blocks)
{
auto hash (boost::get<nano::block_hash> (block));
auto existing (cache.get<tag_hash> ().find (hash));
if (existing == cache.get<tag_hash> ().end ())
{
// Clean old votes
if (cache.size () >= max_cache_size)
{
cache.get<tag_sequence> ().pop_front ();
}
// Insert new votes (new hash)
auto inserted (cache.get<tag_sequence> ().emplace_back (nano::cached_votes{ hash, std::vector<std::shared_ptr<nano::vote>> (1, vote_a) }));
(void)inserted;
debug_assert (inserted.second);
}
else
{
// Insert new votes (old hash)
cache.get<tag_hash> ().modify (existing, [vote_a](nano::cached_votes & cache_a) {
// Replace old vote for same representative & hash
bool replaced (false);
for (auto i (cache_a.votes.begin ()), n (cache_a.votes.end ()); i != n && !replaced; ++i)
{
if ((*i)->account == vote_a->account)
{
*i = vote_a;
replaced = true;
}
}
// Insert new vote
if (!replaced)
{
cache_a.votes.push_back (vote_a);
}
});
}
}
}
std::vector<std::shared_ptr<nano::vote>> nano::votes_cache::find (nano::block_hash const & hash_a)
{
std::vector<std::shared_ptr<nano::vote>> result;
nano::lock_guard<std::mutex> lock (cache_mutex);
auto existing (cache.get<tag_hash> ().find (hash_a));
if (existing != cache.get<tag_hash> ().end ())
{
result = existing->votes;
}
return result;
}
void nano::votes_cache::remove (nano::block_hash const & hash_a)
{
nano::lock_guard<std::mutex> lock (cache_mutex);
cache.get<tag_hash> ().erase (hash_a);
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (vote_generator & vote_generator, const std::string & name)
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::vote_generator & vote_generator, const std::string & name)
{
size_t hashes_count = 0;
{
nano::lock_guard<std::mutex> guard (vote_generator.mutex);
hashes_count = vote_generator.hashes.size ();
}
auto sizeof_element = sizeof (decltype (vote_generator.hashes)::value_type);
auto sizeof_hashes_element = sizeof (decltype (vote_generator.hashes)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "state_blocks", hashes_count, sizeof_element }));
return composite;
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (votes_cache & votes_cache, const std::string & name)
{
size_t cache_count;
{
nano::lock_guard<std::mutex> guard (votes_cache.cache_mutex);
cache_count = votes_cache.cache.size ();
}
auto sizeof_element = sizeof (decltype (votes_cache.cache)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
/* This does not currently loop over each element inside the cache to get the sizes of the votes inside cached_votes */
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "cache", cache_count, sizeof_element }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "hashes", hashes_count, sizeof_hashes_element }));
return composite;
}

View file

@ -8,7 +8,6 @@
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
@ -17,20 +16,67 @@
#include <mutex>
#include <thread>
namespace mi = boost::multi_index;
namespace nano
{
class ledger;
class network;
class node_config;
class vote_processor;
class votes_cache;
class wallets;
class local_vote_history final
{
class local_vote final
{
public:
local_vote (nano::root const & root_a, nano::block_hash const & hash_a, std::shared_ptr<nano::vote> const & vote_a) :
root (root_a),
hash (hash_a),
vote (vote_a)
{
}
nano::root root;
nano::block_hash hash;
std::shared_ptr<nano::vote> vote;
};
public:
void add (nano::root const & root_a, nano::block_hash const & hash_a, std::shared_ptr<nano::vote> const & vote_a);
void erase (nano::root const & root_a);
std::vector<std::shared_ptr<nano::vote>> votes (nano::root const & root_a, nano::block_hash const & hash_a) const;
bool exists (nano::root const &) const;
size_t size () const;
private:
// clang-format off
boost::multi_index_container<local_vote,
mi::indexed_by<
mi::hashed_non_unique<mi::tag<class tag_root>,
mi::member<local_vote, nano::root, &local_vote::root>>,
mi::sequenced<mi::tag<class tag_sequence>>>>
history;
// clang-format on
size_t const max_size{ nano::network_params{}.voting.max_cache };
void clean ();
std::vector<std::shared_ptr<nano::vote>> votes (nano::root const & root_a) const;
mutable std::mutex mutex;
friend std::unique_ptr<container_info_component> collect_container_info (local_vote_history & history, const std::string & name);
friend class local_vote_history_basic_Test;
};
std::unique_ptr<container_info_component> collect_container_info (local_vote_history & history, const std::string & name);
class vote_generator final
{
public:
vote_generator (nano::node_config const & config_a, nano::ledger &, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::votes_cache & votes_cache_a, nano::network & network_a);
void add (nano::block_hash const &);
vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a);
void add (nano::root const &, nano::block_hash const &);
void stop ();
private:
@ -40,11 +86,11 @@ private:
nano::ledger & ledger;
nano::wallets & wallets;
nano::vote_processor & vote_processor;
nano::votes_cache & votes_cache;
nano::local_vote_history & history;
nano::network & network;
std::mutex mutex;
mutable std::mutex mutex;
nano::condition_variable condition;
std::deque<nano::block_hash> hashes;
std::deque<std::pair<nano::root, nano::block_hash>> hashes;
nano::network_params network_params;
bool stopped{ false };
bool started{ false };
@ -53,49 +99,17 @@ private:
friend std::unique_ptr<container_info_component> collect_container_info (vote_generator & vote_generator, const std::string & name);
};
std::unique_ptr<container_info_component> collect_container_info (vote_generator & generator, const std::string & name);
class vote_generator_session final
{
public:
vote_generator_session (vote_generator & vote_generator_a);
void add (nano::block_hash const &);
void add (nano::root const &, nano::block_hash const &);
void flush ();
private:
nano::vote_generator & generator;
std::vector<nano::block_hash> hashes;
std::vector<std::pair<nano::root, nano::block_hash>> hashes;
};
std::unique_ptr<container_info_component> collect_container_info (vote_generator & vote_generator, const std::string & name);
class cached_votes final
{
public:
nano::block_hash hash;
std::vector<std::shared_ptr<nano::vote>> votes;
};
class votes_cache final
{
public:
votes_cache (nano::wallets & wallets_a);
void add (std::shared_ptr<nano::vote> const &);
std::vector<std::shared_ptr<nano::vote>> find (nano::block_hash const &);
void remove (nano::block_hash const &);
private:
std::mutex cache_mutex;
// clang-format off
class tag_sequence {};
class tag_hash {};
boost::multi_index_container<nano::cached_votes,
boost::multi_index::indexed_by<
boost::multi_index::sequenced<boost::multi_index::tag<tag_sequence>>,
boost::multi_index::hashed_unique<boost::multi_index::tag<tag_hash>,
boost::multi_index::member<nano::cached_votes, nano::block_hash, &nano::cached_votes::hash>>>>
cache;
// clang-format on
nano::network_params network_params;
nano::wallets & wallets;
friend std::unique_ptr<container_info_component> collect_container_info (votes_cache & votes_cache, const std::string & name);
};
std::unique_ptr<container_info_component> collect_container_info (votes_cache & votes_cache, const std::string & name);
}

View file

@ -144,7 +144,7 @@ nano::node_constants::node_constants (nano::network_constants & network_constant
nano::voting_constants::voting_constants (nano::network_constants & network_constants)
{
max_cache = network_constants.is_test_network () ? 2 : 64 * 1024;
max_cache = network_constants.is_test_network () ? 256 : 128 * 1024;
}
nano::portmapping_constants::portmapping_constants (nano::network_constants & network_constants)

View file

@ -60,6 +60,14 @@ struct hash<::nano::qualified_root>
return std::hash<::nano::qualified_root> () (value_a);
}
};
template <>
struct hash<::nano::root>
{
size_t operator() (::nano::root const & value_a) const
{
return std::hash<::nano::root> () (value_a);
}
};
}
namespace nano
{