Move all vote generation to the voting thread (#2865)

* Move all vote generation to the voting thread

Following https://github.com/nanocurrency/nano-node/pull/2827 , this PR makes additional enhancements and consistency checks to the request aggregator, vote generator and vote history cache.

Changes to which threads vote:
- Request aggregator hashes that have no cached votes are now queued to the vote generator instead. The channel is passed along so that reply can follow. These hashes are queued in bulk, and the number of "bulks" is limited to 2048, erasing the oldest values if going beyond that. Candidates queued for broadcasting always have priority.
- All vote generation is now done from the voting thread. This is enforced with an assert (unless in the test network) inside the vote generation method. By reducing the number of interfaces we reduce the probability of a bug related to vote generation.

Checks to grab votes from cache in more situations:
- Added an early cache check when queueing vote candidates from elections.
- Added late cache checks in the vote generation loop, to prevent re-generating votes when they were queued both for broadcasting (a new election) and/or multiple requests. Given the resource usage of vote generation, this might not be that uncommon. Added new stats to track in beta testing.

Local votes cache consistency enhancements:
- Votes cached are now enforced to be unique by account. Since we *now* have a single thread adding to the cache, along with the late cache checks, this should not happen, but the cost to enforce is negligible.
- Expanded the vote cache consistency checks performed in debug builds.

Others:
- Added an early return to `stats:add` if the value is 0, so the caller doesn't have to make that check.

Note the minimal amount of changes to existing tests, which should give some confidence to this approach.

* Slow down bootstrap confirm_frontiers in tests

This fixes intermittent failures in bootstrap_processor.frontiers_confirmed with TSAN. Threads have shifted which can cause a timing change especially with a sanitizer.

* Lower requeued_pulls_limit_dev to 1

This is unrelated but makes frontiers confirmation related tests twice as fast

* Making consistency_check const and only compile on debug

* Reduce scope of read transaction in vote_generator::generate (Wes review)

* Save a copy by moving pop_front to end of the loop (Wez review)

* Use structured bindings for readibility

* Redundant condition (Wes review)

* Compile consistency_check in NDEBUG as well to avoid preprocessor magic (Colin review)

* Spelling

* Fix stat requests_generated_votes increasing for late cached replies

This was causing an intermittent failure in
request_aggregator.two_endpoints
This commit is contained in:
Guilherme Lawless 2020-09-15 14:16:53 +01:00 committed by GitHub
commit 7f7981c8e6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 314 additions and 135 deletions

View file

@ -2483,6 +2483,8 @@ TEST (node, stat_counting)
ASSERT_EQ (10, node1.stats.count (nano::stat::type::ledger, nano::stat::dir::in));
ASSERT_EQ (2, node1.stats.count (nano::stat::type::ledger, nano::stat::detail::send, nano::stat::dir::in));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::ledger, nano::stat::detail::receive, nano::stat::dir::in));
node1.stats.add (nano::stat::type::ledger, nano::stat::dir::in, 0);
ASSERT_EQ (10, node1.stats.count (nano::stat::type::ledger, nano::stat::dir::in));
}
TEST (node, online_reps)

View file

@ -129,7 +129,7 @@ TEST (request_aggregator, two_endpoints)
auto & node2 (*system.add_node (node_config, node_flags));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::dev_genesis_key.pub, genesis.hash (), nano::dev_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *node1.work_generate_blocking (genesis.hash ())));
auto send1 (std::make_shared<nano::state_block> (nano::dev_genesis_key.pub, genesis.hash (), nano::dev_genesis_key.pub, nano::genesis_amount - 1, nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *node1.work_generate_blocking (genesis.hash ())));
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
ASSERT_EQ (nano::process_result::progress, node1.ledger.process (node1.store.tx_begin_write (), *send1).code);
@ -147,8 +147,8 @@ TEST (request_aggregator, two_endpoints)
ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes) + node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_late_hashes));
ASSERT_TIMELY (3s, 1 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_votes) + node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached_late_votes));
ASSERT_TIMELY (3s, 0 == node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote));
}
@ -380,6 +380,6 @@ TEST (request_aggregator, cannot_vote)
ASSERT_TIMELY (3s, 2 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_TIMELY (3s, 1 == node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_TIMELY (3s, 1 <= node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
}

View file

@ -16,29 +16,37 @@ TEST (local_vote_history, basic)
ASSERT_FALSE (history.exists (2));
ASSERT_TRUE (history.votes (1).empty ());
ASSERT_TRUE (history.votes (2).empty ());
auto vote1 (std::make_shared<nano::vote> ());
auto vote1a (std::make_shared<nano::vote> ());
ASSERT_EQ (0, history.size ());
history.add (1, 2, vote1);
history.add (1, 2, vote1a);
ASSERT_EQ (1, history.size ());
ASSERT_TRUE (history.exists (1));
ASSERT_FALSE (history.exists (2));
auto votes1 (history.votes (1));
ASSERT_FALSE (votes1.empty ());
auto votes1a (history.votes (1));
ASSERT_FALSE (votes1a.empty ());
ASSERT_EQ (1, history.votes (1, 2).size ());
ASSERT_TRUE (history.votes (1, 1).empty ());
ASSERT_TRUE (history.votes (1, 3).empty ());
ASSERT_TRUE (history.votes (2).empty ());
ASSERT_EQ (1, votes1.size ());
ASSERT_EQ (vote1, votes1[0]);
ASSERT_EQ (1, votes1a.size ());
ASSERT_EQ (vote1a, votes1a[0]);
auto vote1b (std::make_shared<nano::vote> ());
history.add (1, 2, vote1b);
auto votes1b (history.votes (1));
ASSERT_EQ (1, votes1b.size ());
ASSERT_EQ (vote1b, votes1b[0]);
ASSERT_NE (vote1a, votes1b[0]);
auto vote2 (std::make_shared<nano::vote> ());
vote2->account.dwords[0]++;
ASSERT_EQ (1, history.size ());
history.add (1, 2, vote2);
ASSERT_EQ (2, history.size ());
auto votes2 (history.votes (1));
ASSERT_EQ (2, votes2.size ());
ASSERT_TRUE (vote1 == votes2[0] || vote1 == votes2[1]);
ASSERT_TRUE (vote1b == votes2[0] || vote1b == votes2[1]);
ASSERT_TRUE (vote2 == votes2[0] || vote2 == votes2[1]);
auto vote3 (std::make_shared<nano::vote> ());
vote3->account.dwords[1]++;
history.add (1, 3, vote3);
ASSERT_EQ (1, history.size ());
auto votes3 (history.votes (1));
@ -103,13 +111,10 @@ TEST (vote_generator, session)
nano::vote_generator_session generator_session (node->active.generator);
boost::thread thread ([node, &generator_session]() {
nano::thread_role::set (nano::thread_role::name::request_loop);
for (unsigned i = 0; i < 100; ++i)
{
generator_session.add (nano::genesis_account, nano::genesis_hash);
}
ASSERT_EQ (0, node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate));
generator_session.flush ();
});
thread.join ();
ASSERT_TIMELY (5s, node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate) == (100 / nano::network::confirm_ack_hashes_max));
ASSERT_TIMELY (2s, 1 == node->stats.count (nano::stat::type::vote, nano::stat::detail::vote_indeterminate));
}

View file

@ -450,6 +450,9 @@ std::string nano::stat::type_to_string (uint32_t key)
case nano::stat::type::telemetry:
res = "telemetry";
break;
case nano::stat::type::vote_generator:
res = "vote_generator";
break;
}
return res;
}
@ -712,6 +715,12 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::requests_generated_votes:
res = "requests_generated_votes";
break;
case nano::stat::detail::requests_cached_late_hashes:
res = "requests_cached_late_hashes";
break;
case nano::stat::detail::requests_cached_late_votes:
res = "requests_cached_late_votes";
break;
case nano::stat::detail::requests_cannot_vote:
res = "requests_cannot_vote";
break;
@ -742,6 +751,15 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::failed_send_telemetry_req:
res = "failed_send_telemetry_req";
break;
case nano::stat::detail::generator_broadcasts:
res = "generator_broadcasts";
break;
case nano::stat::detail::generator_replies:
res = "generator_replies";
break;
case nano::stat::detail::generator_replies_discarded:
res = "generator_replies_discarded";
break;
}
return res;
}

View file

@ -202,6 +202,7 @@ public:
requests,
filter,
telemetry,
vote_generator
};
/** Optional detail type */
@ -321,6 +322,8 @@ public:
requests_generated_hashes,
requests_cached_votes,
requests_generated_votes,
requests_cached_late_hashes,
requests_cached_late_votes,
requests_cannot_vote,
requests_unknown,
@ -334,7 +337,12 @@ public:
request_within_protection_cache_zone,
no_response_received,
unsolicited_telemetry_ack,
failed_send_telemetry_req
failed_send_telemetry_req,
// vote generator
generator_broadcasts,
generator_replies,
generator_replies_discarded
};
/** Direction of the stat. If the direction is irrelevant, use in */
@ -407,6 +415,11 @@ public:
*/
void add (stat::type type, stat::detail detail, stat::dir dir, uint64_t value, bool detail_only = false)
{
if (value == 0)
{
return;
}
constexpr uint32_t no_detail_mask = 0xffff00ff;
uint32_t key = key_of (type, detail, dir);

View file

@ -24,7 +24,7 @@ confirmation_height_processor (confirmation_height_processor_a),
node (node_a),
multipliers_cb (20, 1.),
trended_active_multiplier (1.0),
generator (node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network),
generator (node_a.config, node_a.ledger, node_a.wallets, node_a.vote_processor, node_a.history, node_a.network, node_a.stats),
check_all_elections_period (node_a.network_params.network.is_dev_network () ? 10ms : 5s),
election_time_to_live (node_a.network_params.network.is_dev_network () ? 0s : 2s),
prioritized_cutoff (std::max<size_t> (1, node_a.config.active_elections_size / 10)),

View file

@ -131,7 +131,7 @@ public:
static constexpr double required_frontier_confirmation_ratio = 0.8;
static constexpr unsigned frontier_confirmation_blocks_limit = 128 * 1024;
static constexpr unsigned requeued_pulls_limit = 256;
static constexpr unsigned requeued_pulls_limit_dev = 2;
static constexpr unsigned requeued_pulls_limit_dev = 1;
static constexpr unsigned requeued_pulls_processed_blocks_factor = 4096;
static constexpr unsigned bulk_push_cost_limit = 200;
static constexpr std::chrono::seconds lazy_flush_delay_sec = std::chrono::seconds (5);

View file

@ -485,7 +485,7 @@ bool nano::bootstrap_attempt_legacy::confirm_frontiers (nano::unique_lock<std::m
else if (i < max_requests)
{
node->network.broadcast_confirm_req_batched_many (batched_confirm_req_bundle);
std::this_thread::sleep_for (std::chrono::milliseconds (!node->network_params.network.is_dev_network () ? 500 : 5));
std::this_thread::sleep_for (std::chrono::milliseconds (!node->network_params.network.is_dev_network () ? 500 : 100));
}
}
if (!confirmed)

View file

@ -125,7 +125,7 @@ online_reps (ledger, network_params, config.online_weight_minimum.number ()),
vote_uniquer (block_uniquer),
confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger, node_initialized_latch, flags.confirmation_height_processor_mode),
active (*this, confirmation_height_processor),
aggregator (network_params.network, config, stats, history, ledger, wallets, active),
aggregator (network_params.network, config, stats, active.generator, history, ledger, wallets, active),
payment_observer_processor (observers.blocks),
wallets (wallets_store.init_error (), *this),
startup_time (std::chrono::steady_clock::now ()),

View file

@ -126,7 +126,7 @@ public:
bool disable_bootstrap_bulk_pull_server{ false };
bool disable_bootstrap_bulk_push_client{ false };
bool disable_rep_crawler{ false };
bool disable_request_loop{ false };
bool disable_request_loop{ false }; // For testing only
bool disable_tcp_realtime{ false };
bool disable_udp{ true };
bool disable_unchecked_cleanup{ false };

View file

@ -11,7 +11,7 @@
#include <nano/secure/blockstore.hpp>
#include <nano/secure/ledger.hpp>
nano::request_aggregator::request_aggregator (nano::network_constants const & network_constants_a, nano::node_config const & config_a, nano::stat & stats_a, nano::local_vote_history & history_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::active_transactions & active_a) :
nano::request_aggregator::request_aggregator (nano::network_constants const & network_constants_a, nano::node_config const & config_a, nano::stat & stats_a, nano::vote_generator & generator_a, nano::local_vote_history & history_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::active_transactions & active_a) :
max_delay (network_constants_a.is_dev_network () ? 50 : 300),
small_delay (network_constants_a.is_dev_network () ? 10 : 50),
max_channel_requests (config_a.max_queued_requests),
@ -20,8 +20,12 @@ local_votes (history_a),
ledger (ledger_a),
wallets (wallets_a),
active (active_a),
generator (generator_a),
thread ([this]() { run (); })
{
generator.set_reply_action ([this](std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> & channel_a) {
this->reply_action (vote_a, channel_a);
});
nano::unique_lock<std::mutex> lock (mutex);
condition.wait (lock, [& started = started] { return started; });
}
@ -88,12 +92,12 @@ void nano::request_aggregator::run ()
requests_by_deadline.erase (front);
lock.unlock ();
erase_duplicates (hashes_roots);
auto transaction (ledger.store.tx_begin_read ());
auto remaining = aggregate (transaction, hashes_roots, channel);
auto const remaining = aggregate (hashes_roots, channel);
if (!remaining.empty ())
{
// Generate votes for the remaining hashes
generate (transaction, remaining, channel);
auto const generated = generator.generate (remaining, channel);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote, stat::dir::in, remaining.size () - generated);
}
lock.lock ();
}
@ -134,6 +138,12 @@ bool nano::request_aggregator::empty ()
return size () == 0;
}
void nano::request_aggregator::reply_action (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
nano::confirm_ack confirm (vote_a);
channel_a->send (confirm);
}
void nano::request_aggregator::erase_duplicates (std::vector<std::pair<nano::block_hash, nano::root>> & requests_a) const
{
std::sort (requests_a.begin (), requests_a.end (), [](auto const & pair1, auto const & pair2) {
@ -145,10 +155,11 @@ void nano::request_aggregator::erase_duplicates (std::vector<std::pair<nano::blo
requests_a.end ());
}
std::vector<std::pair<nano::root, nano::block_hash>> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const
std::vector<std::shared_ptr<nano::block>> nano::request_aggregator::aggregate (std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
auto transaction (ledger.store.tx_begin_read ());
size_t cached_hashes = 0;
std::vector<std::pair<nano::root, nano::block_hash>> to_generate;
std::vector<std::shared_ptr<nano::block>> to_generate;
std::vector<std::shared_ptr<nano::vote>> cached_votes;
for (auto const & hash_root : requests_a)
{
@ -167,19 +178,20 @@ std::vector<std::pair<nano::root, nano::block_hash>> nano::request_aggregator::a
// 3. Ledger by hash
if (block == nullptr)
{
block = ledger.store.block_get (transaction_a, hash_root.first);
block = ledger.store.block_get (transaction, hash_root.first);
}
// 4. Ledger by root
if (block == nullptr && !hash_root.second.is_zero ())
{
// Search for block root
auto successor (ledger.store.block_successor (transaction_a, hash_root.second.as_block_hash ()));
auto successor (ledger.store.block_successor (transaction, hash_root.second.as_block_hash ()));
// Search for account root
if (successor.is_zero ())
{
nano::account_info info;
auto error (ledger.store.account_get (transaction_a, hash_root.second.as_account (), info));
auto error (ledger.store.account_get (transaction, hash_root.second.as_account (), info));
if (!error)
{
successor = info.open_block;
@ -187,7 +199,7 @@ std::vector<std::pair<nano::root, nano::block_hash>> nano::request_aggregator::a
}
if (!successor.is_zero ())
{
auto successor_block = ledger.store.block_get (transaction_a, successor);
auto successor_block = ledger.store.block_get (transaction, successor);
debug_assert (successor_block != nullptr);
// 5. Votes in cache for successor
auto find_successor_votes (local_votes.votes (hash_root.second, successor));
@ -204,15 +216,8 @@ std::vector<std::pair<nano::root, nano::block_hash>> nano::request_aggregator::a
if (block)
{
// Attempt to vote for this block
if (ledger.dependents_confirmed (transaction_a, *block))
{
to_generate.emplace_back (block->root (), block->hash ());
}
else
{
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote, stat::dir::in);
}
to_generate.push_back (block);
// Let the node know about the alternative block
if (block->hash () != hash_root.first)
{
@ -231,45 +236,13 @@ std::vector<std::pair<nano::root, nano::block_hash>> nano::request_aggregator::a
cached_votes.erase (std::unique (cached_votes.begin (), cached_votes.end ()), cached_votes.end ());
for (auto const & vote : cached_votes)
{
nano::confirm_ack confirm (vote);
channel_a->send (confirm);
reply_action (vote, channel_a);
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes, stat::dir::in, cached_hashes);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_votes, stat::dir::in, cached_votes.size ());
return to_generate;
}
void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<std::pair<nano::root, nano::block_hash>> const & hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
{
size_t generated_l = 0;
auto i (hashes_a.begin ());
auto n (hashes_a.end ());
while (i != n)
{
std::vector<nano::block_hash> hashes_l;
std::vector<nano::root> roots;
hashes_l.reserve (nano::network::confirm_ack_hashes_max);
roots.reserve (nano::network::confirm_ack_hashes_max);
for (; i != n && hashes_l.size () < nano::network::confirm_ack_hashes_max; ++i)
{
roots.push_back (i->first);
hashes_l.push_back (i->second);
}
wallets.foreach_representative ([this, &generated_l, &hashes_l, &roots, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
auto vote (this->ledger.store.vote_generate (transaction_a, pub_a, prv_a, hashes_l));
++generated_l;
nano::confirm_ack confirm (vote);
channel_a->send (confirm);
for (size_t i (0), n (hashes_l.size ()); i != n; ++i)
{
this->local_votes.add (roots[i], hashes_l[i], vote);
}
});
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes_a.size ());
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in, generated_l);
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::request_aggregator & aggregator, const std::string & name)
{
auto pools_count = aggregator.size ();

View file

@ -22,6 +22,7 @@ class ledger;
class local_vote_history;
class node_config;
class stat;
class vote_generator;
class wallets;
/**
* Pools together confirmation requests, separately for each endpoint.
@ -58,8 +59,7 @@ class request_aggregator final
// clang-format on
public:
request_aggregator () = delete;
request_aggregator (nano::network_constants const &, nano::node_config const & config, nano::stat & stats_a, nano::local_vote_history &, nano::ledger &, nano::wallets &, nano::active_transactions &);
request_aggregator (nano::network_constants const &, nano::node_config const & config, nano::stat & stats_a, nano::vote_generator &, nano::local_vote_history &, nano::ledger &, nano::wallets &, nano::active_transactions &);
/** Add a new request by \p channel_a for hashes \p hashes_roots_a */
void add (std::shared_ptr<nano::transport::channel> & channel_a, std::vector<std::pair<nano::block_hash, nano::root>> const & hashes_roots_a);
@ -77,15 +77,15 @@ private:
/** Remove duplicate requests **/
void erase_duplicates (std::vector<std::pair<nano::block_hash, nano::root>> &) const;
/** Aggregate \p requests_a and send cached votes to \p channel_a . Return the remaining hashes that need vote generation **/
std::vector<std::pair<nano::root, nano::block_hash>> aggregate (nano::transaction const &, std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
/** Generate votes from \p hashes_a and send to \p channel_a **/
void generate (nano::transaction const &, std::vector<std::pair<nano::root, nano::block_hash>> const & hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
std::vector<std::shared_ptr<nano::block>> aggregate (std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
void reply_action (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> & channel_a) const;
nano::stat & stats;
nano::local_vote_history & local_votes;
nano::ledger & ledger;
nano::wallets & wallets;
nano::active_transactions & active;
nano::vote_generator & generator;
// clang-format off
boost::multi_index_container<channel_pool,

View file

@ -1,6 +1,6 @@
#include "transport/udp.hpp"
#include <nano/lib/stats.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/network.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/node/vote_processor.hpp>
@ -9,20 +9,32 @@
#include <nano/secure/blockstore.hpp>
#include <nano/secure/ledger.hpp>
#include <boost/variant/get.hpp>
#include <chrono>
bool nano::local_vote_history::consistency_check (nano::root const & root_a) const
{
auto & history_by_root (history.get<tag_root> ());
auto const range (history_by_root.equal_range (root_a));
// All cached votes for a root must be for the same hash, this is actively enforced in local_vote_history::add
auto consistent = std::all_of (range.first, range.second, [hash = range.first->hash](auto const & info_a) { return info_a.hash == hash; });
std::vector<nano::account> accounts;
std::transform (range.first, range.second, std::back_inserter (accounts), [](auto const & info_a) { return info_a.vote->account; });
std::sort (accounts.begin (), accounts.end ());
// All cached votes must be unique by account, this is actively enforced in local_vote_history::add
consistent = consistent && accounts.size () == std::unique (accounts.begin (), accounts.end ()) - accounts.begin ();
return consistent;
}
void nano::local_vote_history::add (nano::root const & root_a, nano::block_hash const & hash_a, std::shared_ptr<nano::vote> const & vote_a)
{
nano::lock_guard<std::mutex> guard (mutex);
clean ();
auto & history_by_root (history.get<tag_root> ());
// Erase any vote that is not for this hash
// Erase any vote that is not for this hash, or duplicate by account
auto range (history_by_root.equal_range (root_a));
for (auto i (range.first); i != range.second;)
{
if (i->hash != hash_a)
if (i->hash != hash_a || vote_a->account == i->vote->account)
{
i = history_by_root.erase (i);
}
@ -32,8 +44,9 @@ void nano::local_vote_history::add (nano::root const & root_a, nano::block_hash
}
}
auto result (history_by_root.emplace (root_a, hash_a, vote_a));
(void)result;
debug_assert (result.second);
debug_assert (std::all_of (history_by_root.equal_range (root_a).first, history_by_root.equal_range (root_a).second, [&hash_a](local_vote const & item_a) -> bool { return item_a.vote != nullptr && item_a.hash == hash_a; }));
debug_assert (consistency_check (root_a));
}
void nano::local_vote_history::erase (nano::root const & root_a)
@ -98,13 +111,14 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (na
return composite;
}
nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a) :
nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stat & stats_a) :
config (config_a),
ledger (ledger_a),
wallets (wallets_a),
vote_processor (vote_processor_a),
history (history_a),
network (network_a),
stats (stats_a),
thread ([this]() { run (); })
{
nano::unique_lock<std::mutex> lock (mutex);
@ -113,18 +127,29 @@ thread ([this]() { run (); })
void nano::vote_generator::add (nano::root const & root_a, nano::block_hash const & hash_a)
{
auto votes (history.votes (root_a, hash_a));
if (!votes.empty ())
{
for (auto const & vote : votes)
{
broadcast_action (vote);
}
}
else
{
auto transaction (ledger.store.tx_begin_read ());
auto block (ledger.store.block_get (transaction, hash_a));
if (block != nullptr && ledger.dependents_confirmed (transaction, *block))
{
nano::unique_lock<std::mutex> lock (mutex);
hashes.emplace_back (root_a, hash_a);
if (hashes.size () >= nano::network::confirm_ack_hashes_max)
candidates.emplace_back (root_a, hash_a);
if (candidates.size () >= nano::network::confirm_ack_hashes_max)
{
lock.unlock ();
condition.notify_all ();
}
}
}
}
void nano::vote_generator::stop ()
@ -141,36 +166,143 @@ void nano::vote_generator::stop ()
}
}
void nano::vote_generator::send (nano::unique_lock<std::mutex> & lock_a)
size_t nano::vote_generator::generate (std::vector<std::shared_ptr<nano::block>> const & blocks_a, std::shared_ptr<nano::transport::channel> const & channel_a)
{
std::vector<nano::block_hash> hashes_l;
std::vector<nano::root> roots;
hashes_l.reserve (nano::network::confirm_ack_hashes_max);
roots.reserve (nano::network::confirm_ack_hashes_max);
while (!hashes.empty () && hashes_l.size () < nano::network::confirm_ack_hashes_max)
{
auto front (hashes.front ());
hashes.pop_front ();
roots.push_back (front.first);
hashes_l.push_back (front.second);
}
lock_a.unlock ();
request_t::first_type candidates;
{
auto transaction (ledger.store.tx_begin_read ());
wallets.foreach_representative ([this, &hashes_l, &roots, &transaction](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
auto vote (this->ledger.store.vote_generate (transaction, pub_a, prv_a, hashes_l));
for (size_t i (0), n (hashes_l.size ()); i != n; ++i)
{
this->history.add (roots[i], hashes_l[i], vote);
auto dependents_confirmed = [&blocks_a, &transaction, this](auto const & block_a) {
return this->ledger.dependents_confirmed (transaction, *block_a);
};
auto as_candidate = [](auto const & block_a) {
return candidate_t{ block_a->root (), block_a->hash () };
};
nano::transform_if (blocks_a.begin (), blocks_a.end (), std::back_inserter (candidates), dependents_confirmed, as_candidate);
}
this->network.flood_vote_pr (vote);
this->network.flood_vote (vote, 2.0f);
this->vote_processor.vote (vote, std::make_shared<nano::transport::channel_udp> (this->network.udp_channels, this->network.endpoint (), this->network_params.protocol.protocol_version));
auto const result = candidates.size ();
nano::lock_guard<std::mutex> guard (mutex);
requests.emplace_back (std::move (candidates), channel_a);
while (requests.size () > max_requests)
{
// On a large queue of requests, erase the oldest one
requests.pop_front ();
stats.inc (nano::stat::type::vote_generator, nano::stat::detail::generator_replies_discarded);
}
return result;
}
void nano::vote_generator::set_reply_action (std::function<void(std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> &)> action_a)
{
release_assert (!reply_action);
reply_action = action_a;
}
void nano::vote_generator::broadcast (nano::unique_lock<std::mutex> & lock_a)
{
debug_assert (lock_a.owns_lock ());
std::unordered_set<std::shared_ptr<nano::vote>> cached_sent;
std::vector<nano::block_hash> hashes;
std::vector<nano::root> roots;
hashes.reserve (nano::network::confirm_ack_hashes_max);
roots.reserve (nano::network::confirm_ack_hashes_max);
while (!candidates.empty () && hashes.size () < nano::network::confirm_ack_hashes_max)
{
auto const & [root, hash] = candidates.front ();
auto cached_votes = history.votes (root, hash);
for (auto const & cached_vote : cached_votes)
{
if (cached_sent.insert (cached_vote).second)
{
broadcast_action (cached_vote);
}
}
if (cached_votes.empty ())
{
roots.push_back (root);
hashes.push_back (hash);
}
candidates.pop_front ();
}
if (!hashes.empty ())
{
lock_a.unlock ();
vote (hashes, roots, [this](auto const & vote_a) { this->broadcast_action (vote_a); });
lock_a.lock ();
}
stats.inc (nano::stat::type::vote_generator, nano::stat::detail::generator_broadcasts);
}
void nano::vote_generator::reply (nano::unique_lock<std::mutex> & lock_a, request_t && request_a)
{
lock_a.unlock ();
std::unordered_set<std::shared_ptr<nano::vote>> cached_sent;
auto transaction (ledger.store.tx_begin_read ());
auto i (request_a.first.cbegin ());
auto n (request_a.first.cend ());
while (i != n && !stopped)
{
std::vector<nano::block_hash> hashes;
std::vector<nano::root> roots;
hashes.reserve (nano::network::confirm_ack_hashes_max);
roots.reserve (nano::network::confirm_ack_hashes_max);
for (; i != n && hashes.size () < nano::network::confirm_ack_hashes_max; ++i)
{
auto cached_votes = history.votes (i->first, i->second);
for (auto const & cached_vote : cached_votes)
{
if (cached_sent.insert (cached_vote).second)
{
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_late_hashes, stat::dir::in, cached_vote->blocks.size ());
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_cached_late_votes, stat::dir::in);
reply_action (cached_vote, request_a.second);
}
}
if (cached_votes.empty ())
{
roots.push_back (i->first);
hashes.push_back (i->second);
}
}
if (!hashes.empty ())
{
stats.add (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes, stat::dir::in, hashes.size ());
vote (hashes, roots, [this, &channel = request_a.second](std::shared_ptr<nano::vote> const & vote_a) {
this->reply_action (vote_a, channel);
this->stats.inc (nano::stat::type::requests, nano::stat::detail::requests_generated_votes, stat::dir::in);
});
}
}
stats.inc (nano::stat::type::vote_generator, nano::stat::detail::generator_replies);
lock_a.lock ();
}
void nano::vote_generator::vote (std::vector<nano::block_hash> const & hashes_a, std::vector<nano::root> const & roots_a, std::function<void(std::shared_ptr<nano::vote> const &)> const & action_a)
{
debug_assert (hashes_a.size () == roots_a.size ());
std::vector<std::shared_ptr<nano::vote>> votes_l;
{
auto transaction (ledger.store.tx_begin_read ());
wallets.foreach_representative ([this, &hashes_a, &roots_a, &transaction, &votes_l](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
votes_l.emplace_back (this->ledger.store.vote_generate (transaction, pub_a, prv_a, hashes_a));
});
}
for (auto const & vote_l : votes_l)
{
for (size_t i (0), n (hashes_a.size ()); i != n; ++i)
{
history.add (roots_a[i], hashes_a[i], vote_l);
}
action_a (vote_l);
}
}
void nano::vote_generator::broadcast_action (std::shared_ptr<nano::vote> const & vote_a) const
{
network.flood_vote_pr (vote_a);
network.flood_vote (vote_a, 2.0f);
vote_processor.vote (vote_a, std::make_shared<nano::transport::channel_udp> (network.udp_channels, network.endpoint (), network_params.protocol.protocol_version));
}
void nano::vote_generator::run ()
{
nano::thread_role::set (nano::thread_role::name::voting);
@ -181,20 +313,26 @@ void nano::vote_generator::run ()
lock.lock ();
while (!stopped)
{
if (hashes.size () >= nano::network::confirm_ack_hashes_max)
if (candidates.size () >= nano::network::confirm_ack_hashes_max)
{
send (lock);
broadcast (lock);
}
else if (!requests.empty ())
{
auto request (requests.front ());
requests.pop_front ();
reply (lock, std::move (request));
}
else
{
condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= nano::network::confirm_ack_hashes_max; });
if (hashes.size () >= config.vote_generator_threshold && hashes.size () < nano::network::confirm_ack_hashes_max)
condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->candidates.size () >= nano::network::confirm_ack_hashes_max; });
if (candidates.size () >= config.vote_generator_threshold && candidates.size () < nano::network::confirm_ack_hashes_max)
{
condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->hashes.size () >= nano::network::confirm_ack_hashes_max; });
condition.wait_for (lock, config.vote_generator_delay, [this]() { return this->candidates.size () >= nano::network::confirm_ack_hashes_max; });
}
if (!hashes.empty ())
if (!candidates.empty ())
{
send (lock);
broadcast (lock);
}
}
}
@ -208,27 +346,31 @@ generator (vote_generator_a)
void nano::vote_generator_session::add (nano::root const & root_a, nano::block_hash const & hash_a)
{
debug_assert (nano::thread_role::get () == nano::thread_role::name::request_loop);
hashes.emplace_back (root_a, hash_a);
items.emplace_back (root_a, hash_a);
}
void nano::vote_generator_session::flush ()
{
debug_assert (nano::thread_role::get () == nano::thread_role::name::request_loop);
for (auto const & i : hashes)
for (auto const & [root, hash] : items)
{
generator.add (i.first, i.second);
generator.add (root, hash);
}
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::vote_generator & vote_generator, const std::string & name)
{
size_t hashes_count = 0;
size_t candidates_count = 0;
size_t requests_count = 0;
{
nano::lock_guard<std::mutex> guard (vote_generator.mutex);
hashes_count = vote_generator.hashes.size ();
candidates_count = vote_generator.candidates.size ();
requests_count = vote_generator.requests.size ();
}
auto sizeof_hashes_element = sizeof (decltype (vote_generator.hashes)::value_type);
auto sizeof_candidate_element = sizeof (decltype (vote_generator.candidates)::value_type);
auto sizeof_request_element = sizeof (decltype (vote_generator.requests)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "hashes", hashes_count, sizeof_hashes_element }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "candidates", candidates_count, sizeof_candidate_element }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "requests", requests_count, sizeof_request_element }));
return composite;
}

View file

@ -23,8 +23,13 @@ namespace nano
class ledger;
class network;
class node_config;
class stat;
class vote_processor;
class wallets;
namespace transport
{
class channel;
}
class local_vote_history final
{
@ -63,6 +68,8 @@ private:
size_t const max_size{ nano::network_params{}.voting.max_cache };
void clean ();
std::vector<std::shared_ptr<nano::vote>> votes (nano::root const & root_a) const;
// Only used in Debug
bool consistency_check (nano::root const &) const;
mutable std::mutex mutex;
friend std::unique_ptr<container_info_component> collect_container_info (local_vote_history & history, const std::string & name);
@ -74,25 +81,40 @@ std::unique_ptr<container_info_component> collect_container_info (local_vote_his
class vote_generator final
{
private:
using candidate_t = std::pair<nano::root, nano::block_hash>;
using request_t = std::pair<std::vector<candidate_t>, std::shared_ptr<nano::transport::channel>>;
public:
vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a);
vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stat & stats_a);
/** Queue items for vote generation, or broadcast votes already in cache */
void add (nano::root const &, nano::block_hash const &);
/** Queue blocks for vote generation, returning the number of successful candidates.*/
size_t generate (std::vector<std::shared_ptr<nano::block>> const & blocks_a, std::shared_ptr<nano::transport::channel> const & channel_a);
void set_reply_action (std::function<void(std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> &)>);
void stop ();
private:
void run ();
void send (nano::unique_lock<std::mutex> &);
void broadcast (nano::unique_lock<std::mutex> &);
void reply (nano::unique_lock<std::mutex> &, request_t &&);
void vote (std::vector<nano::block_hash> const &, std::vector<nano::root> const &, std::function<void(std::shared_ptr<nano::vote> const &)> const &);
void broadcast_action (std::shared_ptr<nano::vote> const &) const;
std::function<void(std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> &)> reply_action; // must be set only during initialization by using set_reply_action
nano::node_config const & config;
nano::ledger & ledger;
nano::wallets & wallets;
nano::vote_processor & vote_processor;
nano::local_vote_history & history;
nano::network & network;
nano::stat & stats;
mutable std::mutex mutex;
nano::condition_variable condition;
std::deque<std::pair<nano::root, nano::block_hash>> hashes;
static size_t constexpr max_requests{ 2048 };
std::deque<request_t> requests;
std::deque<candidate_t> candidates;
nano::network_params network_params;
bool stopped{ false };
std::atomic<bool> stopped{ false };
bool started{ false };
std::thread thread;
@ -110,6 +132,6 @@ public:
private:
nano::vote_generator & generator;
std::vector<std::pair<nano::root, nano::block_hash>> hashes;
std::vector<std::pair<nano::root, nano::block_hash>> items;
};
}

View file

@ -1,6 +1,8 @@
#pragma once
#include <nano/lib/config.hpp>
#include <nano/lib/rep_weights.hpp>
#include <nano/lib/threading.hpp>
#include <nano/secure/blockstore.hpp>
#include <nano/secure/buffer.hpp>
@ -273,6 +275,7 @@ public:
std::shared_ptr<nano::vote> vote_generate (nano::transaction const & transaction_a, nano::account const & account_a, nano::raw_key const & key_a, std::shared_ptr<nano::block> block_a) override
{
debug_assert (nano::network_constants ().is_dev_network () || nano::thread_role::get () == nano::thread_role::name::voting);
nano::lock_guard<std::mutex> lock (cache_mutex);
auto result (vote_current (transaction_a, account_a));
uint64_t sequence ((result ? result->sequence : 0) + 1);
@ -283,6 +286,7 @@ public:
std::shared_ptr<nano::vote> vote_generate (nano::transaction const & transaction_a, nano::account const & account_a, nano::raw_key const & key_a, std::vector<nano::block_hash> blocks_a) override
{
debug_assert (nano::network_constants ().is_dev_network () || nano::thread_role::get () == nano::thread_role::name::voting);
nano::lock_guard<std::mutex> lock (cache_mutex);
auto result (vote_current (transaction_a, account_a));
uint64_t sequence ((result ? result->sequence : 0) + 1);