From ad4a4294dace4049ee719ff9b26e6f7419714867 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Wed, 26 Feb 2020 22:29:12 +0000 Subject: [PATCH] Add flag for vote_processor capacity, and tests (#2575) Slightly limited in testing due to being unable to lock the vote processor loop (on the other hand, the block processor can be paused by starting a write transaction). - Add node flag vote_processor_capacity, similar to the block processor flags - Remove check to always process for the test network, the capacity is more than enough - The representative levels stay the same but are now calculated on the fly from the given capacity - ::vote() now returns a boolean representing if the vote was processed, both for testing purposes and will be useful for the upcoming network filter --- nano/core_test/vote_processor.cpp | 39 +++++++++++++++++++++ nano/node/cli.cpp | 9 ++++- nano/node/node.cpp | 2 +- nano/node/nodeconfig.hpp | 1 + nano/node/vote_processor.cpp | 56 +++++++++++++------------------ nano/node/vote_processor.hpp | 7 ++-- 6 files changed, 77 insertions(+), 37 deletions(-) diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 0ac38b5b..2ef8d9d9 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -87,6 +87,45 @@ TEST (vote_processor, invalid_signature) ASSERT_EQ (2, election.first->last_votes.size ()); } +TEST (vote_processor, no_capacity) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.vote_processor_capacity = 0; + auto & node (*system.add_node (node_flags)); + nano::genesis genesis; + nano::keypair key; + auto vote (std::make_shared (key.pub, key.prv, 1, std::vector{ genesis.open->hash () })); + auto channel (std::make_shared (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version)); + ASSERT_TRUE (node.vote_processor.vote (vote, channel)); +} + +TEST (vote_processor, overflow) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.vote_processor_capacity = 1; + auto & node (*system.add_node (node_flags)); + nano::genesis genesis; + nano::keypair key; + auto vote (std::make_shared (key.pub, key.prv, 1, std::vector{ genesis.open->hash () })); + auto channel (std::make_shared (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version)); + + // No way to lock the processor, but queueing votes in quick succession must result in overflow + size_t not_processed{ 0 }; + size_t const total{ 1000 }; + for (unsigned i = 0; i < total; ++i) + { + if (node.vote_processor.vote (vote, channel)) + { + ++not_processed; + } + } + ASSERT_GT (not_processed, 0); + ASSERT_LT (not_processed, total); + ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote, nano::stat::detail::vote_overflow)); +} + namespace nano { TEST (vote_processor, weights) diff --git a/nano/node/cli.cpp b/nano/node/cli.cpp index a04eca75..35080e6a 100644 --- a/nano/node/cli.cpp +++ b/nano/node/cli.cpp @@ -101,7 +101,9 @@ void nano::add_node_flag_options (boost::program_options::options_description & ("block_processor_batch_size", boost::program_options::value(), "Increase block processor transaction batch write size, default 0 (limited by config block_processor_batch_max_time), 256k for fast_bootstrap") ("block_processor_full_size", boost::program_options::value(), "Increase block processor allowed blocks queue size before dropping live network packets and holding bootstrap download, default 65536, 1 million for fast_bootstrap") ("block_processor_verification_size", boost::program_options::value(), "Increase batch signature verification size in block processor, default 0 (limited by config signature_checker_threads), unlimited for fast_bootstrap") - ("inactive_votes_cache_size", boost::program_options::value(), "Increase cached votes without active elections size, default 16384"); + ("inactive_votes_cache_size", boost::program_options::value(), "Increase cached votes without active elections size, default 16384") + ("vote_processor_capacity", boost::program_options::value(), "Vote processor queue size before dropping votes, default 144k") + ; // clang-format on } @@ -160,6 +162,11 @@ std::error_code nano::update_flags (nano::node_flags & flags_a, boost::program_o { flags_a.inactive_votes_cache_size = inactive_votes_cache_size_it->second.as (); } + auto vote_processor_capacity_it = vm.find ("vote_processor_capacity"); + if (vote_processor_capacity_it != vm.end ()) + { + flags_a.vote_processor_capacity = vote_processor_capacity_it->second.as (); + } return ec; } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 675ff886..e16822d6 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -131,7 +131,7 @@ bootstrap_initiator (*this), bootstrap (config.peering_port, *this), application_path (application_path_a), port_mapping (*this), -vote_processor (checker, active, observers, stats, config, logger, online_reps, ledger, network_params), +vote_processor (checker, active, observers, stats, config, flags, logger, online_reps, ledger, network_params), rep_crawler (*this), warmed_up (0), block_processor (*this, write_database_queue), diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index b38a7129..f242cadd 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -137,5 +137,6 @@ public: size_t block_processor_full_size{ 65536 }; size_t block_processor_verification_size{ 0 }; size_t inactive_votes_cache_size{ 16 * 1024 }; + size_t vote_processor_capacity{ 144 * 1024 }; }; } diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index 2f2cb19d..336c5dc9 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -15,7 +15,7 @@ #include -nano::vote_processor::vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a) : +nano::vote_processor::vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a) : checker (checker_a), active (active_a), observers (observers_a), @@ -25,6 +25,7 @@ logger (logger_a), online_reps (online_reps_a), ledger (ledger_a), network_params (network_params_a), +max_votes (flags_a.vote_processor_capacity), started (false), stopped (false), is_active (false), @@ -88,56 +89,45 @@ void nano::vote_processor::process_loop () } } -void nano::vote_processor::vote (std::shared_ptr vote_a, std::shared_ptr channel_a) +bool nano::vote_processor::vote (std::shared_ptr vote_a, std::shared_ptr channel_a) { + bool process (false); nano::unique_lock lock (mutex); if (!stopped) { - bool process (false); - /* Random early delection levels - Always process votes for test network (process = true) - Stop processing with max 144 * 1024 votes */ - if (!network_params.network.is_test_network ()) + // Level 0 (< 0.1%) + if (votes.size () < 6.0 / 9.0 * max_votes) { - // Level 0 (< 0.1%) - if (votes.size () < 96 * 1024) - { - process = true; - } - // Level 1 (0.1-1%) - else if (votes.size () < 112 * 1024) - { - process = (representatives_1.find (vote_a->account) != representatives_1.end ()); - } - // Level 2 (1-5%) - else if (votes.size () < 128 * 1024) - { - process = (representatives_2.find (vote_a->account) != representatives_2.end ()); - } - // Level 3 (> 5%) - else if (votes.size () < 144 * 1024) - { - process = (representatives_3.find (vote_a->account) != representatives_3.end ()); - } - } - else - { - // Process for test network process = true; } + // Level 1 (0.1-1%) + else if (votes.size () < 7.0 / 9.0 * max_votes) + { + process = (representatives_1.find (vote_a->account) != representatives_1.end ()); + } + // Level 2 (1-5%) + else if (votes.size () < 8.0 / 9.0 * max_votes) + { + process = (representatives_2.find (vote_a->account) != representatives_2.end ()); + } + // Level 3 (> 5%) + else if (votes.size () < max_votes) + { + process = (representatives_3.find (vote_a->account) != representatives_3.end ()); + } if (process) { votes.emplace_back (vote_a, channel_a); - lock.unlock (); condition.notify_all (); - lock.lock (); + // Lock no longer required } else { stats.inc (nano::stat::type::vote, nano::stat::detail::vote_overflow); } } + return !process; } void nano::vote_processor::verify_votes (decltype (votes) const & votes_a) diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index b750e2ec..c23eaf1f 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -32,8 +32,9 @@ namespace transport class vote_processor final { public: - explicit vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a); - void vote (std::shared_ptr, std::shared_ptr); + explicit vote_processor (nano::signature_checker & checker_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stat & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger_mt & logger_a, nano::online_reps & online_reps_a, nano::ledger & ledger_a, nano::network_params & network_params_a); + /** Returns false if the vote was processed */ + bool vote (std::shared_ptr, std::shared_ptr); /** Note: node.active.mutex lock is required */ nano::vote_code vote_blocking (std::shared_ptr, std::shared_ptr, bool = false); void verify_votes (std::deque, std::shared_ptr>> const &); @@ -56,6 +57,8 @@ private: nano::ledger & ledger; nano::network_params & network_params; + size_t max_votes; + std::deque, std::shared_ptr>> votes; /** Representatives levels for random early detection */ std::unordered_set representatives_1;