Add flag for vote_processor capacity, and tests (#2575)

Slightly limited in testing due to being unable to lock the vote processor loop (on the other hand, the block processor can be paused by starting a write transaction).

- Add node flag vote_processor_capacity, similar to the block processor flags
- Remove check to always process for the test network, the capacity is more than enough
- The representative levels stay the same but are now calculated on the fly from the given capacity
- ::vote() now returns a boolean representing if the vote was processed, both for testing purposes and will be useful for the upcoming network filter
This commit is contained in:
Guilherme Lawless 2020-02-26 22:29:12 +00:00 committed by GitHub
commit ad4a4294da
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 77 additions and 37 deletions

View file

@ -87,6 +87,45 @@ TEST (vote_processor, invalid_signature)
ASSERT_EQ (2, election.first->last_votes.size ());
}
TEST (vote_processor, no_capacity)
{
nano::system system;
nano::node_flags node_flags;
node_flags.vote_processor_capacity = 0;
auto & node (*system.add_node (node_flags));
nano::genesis genesis;
nano::keypair key;
auto vote (std::make_shared<nano::vote> (key.pub, key.prv, 1, std::vector<nano::block_hash>{ genesis.open->hash () }));
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
ASSERT_TRUE (node.vote_processor.vote (vote, channel));
}
TEST (vote_processor, overflow)
{
nano::system system;
nano::node_flags node_flags;
node_flags.vote_processor_capacity = 1;
auto & node (*system.add_node (node_flags));
nano::genesis genesis;
nano::keypair key;
auto vote (std::make_shared<nano::vote> (key.pub, key.prv, 1, std::vector<nano::block_hash>{ genesis.open->hash () }));
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
// No way to lock the processor, but queueing votes in quick succession must result in overflow
size_t not_processed{ 0 };
size_t const total{ 1000 };
for (unsigned i = 0; i < total; ++i)
{
if (node.vote_processor.vote (vote, channel))
{
++not_processed;
}
}
ASSERT_GT (not_processed, 0);
ASSERT_LT (not_processed, total);
ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote, nano::stat::detail::vote_overflow));
}
namespace nano
{
TEST (vote_processor, weights)

View file

@ -101,7 +101,9 @@ void nano::add_node_flag_options (boost::program_options::options_description &
("block_processor_batch_size", boost::program_options::value<std::size_t>(), "Increase block processor transaction batch write size, default 0 (limited by config block_processor_batch_max_time), 256k for fast_bootstrap")
("block_processor_full_size", boost::program_options::value<std::size_t>(), "Increase block processor allowed blocks queue size before dropping live network packets and holding bootstrap download, default 65536, 1 million for fast_bootstrap")
("block_processor_verification_size", boost::program_options::value<std::size_t>(), "Increase batch signature verification size in block processor, default 0 (limited by config signature_checker_threads), unlimited for fast_bootstrap")
("inactive_votes_cache_size", boost::program_options::value<std::size_t>(), "Increase cached votes without active elections size, default 16384");
("inactive_votes_cache_size", boost::program_options::value<std::size_t>(), "Increase cached votes without active elections size, default 16384")
("vote_processor_capacity", boost::program_options::value<std::size_t>(), "Vote processor queue size before dropping votes, default 144k")
;
// clang-format on
}
@ -160,6 +162,11 @@ std::error_code nano::update_flags (nano::node_flags & flags_a, boost::program_o
{
flags_a.inactive_votes_cache_size = inactive_votes_cache_size_it->second.as<size_t> ();
}
auto vote_processor_capacity_it = vm.find ("vote_processor_capacity");
if (vote_processor_capacity_it != vm.end ())
{
flags_a.vote_processor_capacity = vote_processor_capacity_it->second.as<size_t> ();
}
return ec;
}

View file

@ -131,7 +131,7 @@ bootstrap_initiator (*this),
bootstrap (config.peering_port, *this),
application_path (application_path_a),
port_mapping (*this),
vote_processor (checker, active, observers, stats, config, logger, online_reps, ledger, network_params),
vote_processor (checker, active, observers, stats, config, flags, logger, online_reps, ledger, network_params),
rep_crawler (*this),
warmed_up (0),
block_processor (*this, write_database_queue),

View file

@ -137,5 +137,6 @@ public:
size_t block_processor_full_size{ 65536 };
size_t block_processor_verification_size{ 0 };
size_t inactive_votes_cache_size{ 16 * 1024 };
size_t vote_processor_capacity{ 144 * 1024 };
};
}

View file

@ -15,7 +15,7 @@
#include <boost/format.hpp>
nano::vote_processor::vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
nano::vote_processor::vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
checker (checker_a),
active (active_a),
observers (observers_a),
@ -25,6 +25,7 @@ logger (logger_a),
online_reps (online_reps_a),
ledger (ledger_a),
network_params (network_params_a),
max_votes (flags_a.vote_processor_capacity),
started (false),
stopped (false),
is_active (false),
@ -88,56 +89,45 @@ void nano::vote_processor::process_loop ()
}
}
void nano::vote_processor::vote (std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a)
bool nano::vote_processor::vote (std::shared_ptr<nano::vote> vote_a, std::shared_ptr<nano::transport::channel> channel_a)
{
bool process (false);
nano::unique_lock<std::mutex> lock (mutex);
if (!stopped)
{
bool process (false);
/* Random early delection levels
Always process votes for test network (process = true)
Stop processing with max 144 * 1024 votes */
if (!network_params.network.is_test_network ())
// Level 0 (< 0.1%)
if (votes.size () < 6.0 / 9.0 * max_votes)
{
// Level 0 (< 0.1%)
if (votes.size () < 96 * 1024)
{
process = true;
}
// Level 1 (0.1-1%)
else if (votes.size () < 112 * 1024)
{
process = (representatives_1.find (vote_a->account) != representatives_1.end ());
}
// Level 2 (1-5%)
else if (votes.size () < 128 * 1024)
{
process = (representatives_2.find (vote_a->account) != representatives_2.end ());
}
// Level 3 (> 5%)
else if (votes.size () < 144 * 1024)
{
process = (representatives_3.find (vote_a->account) != representatives_3.end ());
}
}
else
{
// Process for test network
process = true;
}
// Level 1 (0.1-1%)
else if (votes.size () < 7.0 / 9.0 * max_votes)
{
process = (representatives_1.find (vote_a->account) != representatives_1.end ());
}
// Level 2 (1-5%)
else if (votes.size () < 8.0 / 9.0 * max_votes)
{
process = (representatives_2.find (vote_a->account) != representatives_2.end ());
}
// Level 3 (> 5%)
else if (votes.size () < max_votes)
{
process = (representatives_3.find (vote_a->account) != representatives_3.end ());
}
if (process)
{
votes.emplace_back (vote_a, channel_a);
lock.unlock ();
condition.notify_all ();
lock.lock ();
// Lock no longer required
}
else
{
stats.inc (nano::stat::type::vote, nano::stat::detail::vote_overflow);
}
}
return !process;
}
void nano::vote_processor::verify_votes (decltype (votes) const & votes_a)

View file

@ -32,8 +32,9 @@ namespace transport
class vote_processor final
{
public:
explicit vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
void vote (std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>);
explicit vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
/** Returns false if the vote was processed */
bool vote (std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>);
/** Note: node.active.mutex lock is required */
nano::vote_code vote_blocking (std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>, bool = false);
void verify_votes (std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> const &);
@ -56,6 +57,8 @@ private:
nano::ledger & ledger;
nano::network_params & network_params;
size_t max_votes;
std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> votes;
/** Representatives levels for random early detection */
std::unordered_set<nano::account> representatives_1;