From 5b3bfe4ae642b5f5bf0c468b3dee9e330b575acd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 6 Feb 2025 12:26:25 +0100 Subject: [PATCH] Overhaul vote rebroadcaster --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/active_elections.cpp | 7 +- nano/core_test/vote_rebroadcaster.cpp | 273 +++++++++++++++++++ nano/lib/numbers.hpp | 6 + nano/lib/numbers_templ.hpp | 16 ++ nano/lib/stats_enums.hpp | 13 +- nano/node/network.cpp | 9 + nano/node/network.hpp | 3 + nano/node/node.cpp | 2 +- nano/node/nodeconfig.hpp | 2 + nano/node/vote_rebroadcaster.cpp | 363 ++++++++++++++++++++++++-- nano/node/vote_rebroadcaster.hpp | 138 +++++++++- nano/secure/vote.hpp | 2 +- 13 files changed, 798 insertions(+), 37 deletions(-) create mode 100644 nano/core_test/vote_rebroadcaster.cpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 411479aaa..b3275cf63 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -70,6 +70,7 @@ add_executable( utility.cpp vote_cache.cpp vote_processor.cpp + vote_rebroadcaster.cpp voting.cpp wallet.cpp wallets.cpp diff --git a/nano/core_test/active_elections.cpp b/nano/core_test/active_elections.cpp index bbf80c402..87ad5c821 100644 --- a/nano/core_test/active_elections.cpp +++ b/nano/core_test/active_elections.cpp @@ -35,8 +35,11 @@ using namespace std::chrono_literals; */ TEST (active_elections, confirm_election_by_request) { - nano::test::system system{}; - auto & node1 = *system.add_node (); + nano::test::system system; + nano::node_config node_config1; + // Disable vote rebroadcasting to prevent node1 from actively sending votes to node2 + node_config1.vote_rebroadcaster.enable = false; + auto & node1 = *system.add_node (node_config1); nano::state_block_builder builder{}; auto send1 = builder diff --git a/nano/core_test/vote_rebroadcaster.cpp b/nano/core_test/vote_rebroadcaster.cpp new file mode 100644 index 000000000..9c8b0e09c --- /dev/null +++ b/nano/core_test/vote_rebroadcaster.cpp @@ -0,0 +1,273 @@ +#include +#include +#include +#include +#include + +#include + +#include + +using namespace std::chrono_literals; + +namespace +{ +struct test_context +{ + nano::vote_rebroadcaster_config config; + nano::vote_rebroadcaster_index index; + + explicit test_context (nano::vote_rebroadcaster_config config_a = {}) : + config{ config_a }, + index{ config } + { + } +}; +} + +TEST (vote_rebroadcaster_index, construction) +{ + test_context ctx{}; + auto & index = ctx.index; + ASSERT_EQ (index.representatives_count (), 0); + ASSERT_EQ (index.total_history (), 0); + ASSERT_EQ (index.total_hashes (), 0); +} + +TEST (vote_rebroadcaster_index, basic_vote_tracking) +{ + test_context ctx{}; + auto & index = ctx.index; + auto now = std::chrono::steady_clock::now (); + + nano::keypair key; + std::vector hashes = { nano::block_hash{ 1 } }; + auto vote = nano::test::make_vote (key, hashes); + + auto result = index.check_and_record (vote, nano::uint128_t{ 100 }, now); + + ASSERT_EQ (result, nano::vote_rebroadcaster_index::result::ok); + ASSERT_EQ (index.representatives_count (), 1); + ASSERT_EQ (index.total_history (), 1); + ASSERT_EQ (index.total_hashes (), 1); + ASSERT_TRUE (index.contains_representative (key.pub)); + ASSERT_TRUE (index.contains_block (key.pub, hashes[0])); +} + +TEST (vote_rebroadcaster_index, duplicate_vote_rejection) +{ + test_context ctx{}; + auto & index = ctx.index; + auto now = std::chrono::steady_clock::now (); + + nano::keypair key; + std::vector hashes = { nano::block_hash{ 1 } }; + auto vote = nano::test::make_vote (key, hashes); + + // First vote should be accepted + auto result1 = index.check_and_record (vote, nano::uint128_t{ 100 }, now); + ASSERT_EQ (result1, nano::vote_rebroadcaster_index::result::ok); + + // Same vote should be rejected + auto result2 = index.check_and_record (vote, nano::uint128_t{ 100 }, now); + ASSERT_EQ (result2, nano::vote_rebroadcaster_index::result::already_rebroadcasted); + + // Even after time threshold + auto result3 = index.check_and_record (vote, nano::uint128_t{ 100 }, now + 60min); + ASSERT_EQ (result3, nano::vote_rebroadcaster_index::result::already_rebroadcasted); +} + +TEST (vote_rebroadcaster_index, rebroadcast_timing) +{ + nano::vote_rebroadcaster_config config; + config.rebroadcast_threshold = 1000ms; + test_context ctx{ config }; + auto & index = ctx.index; + auto now = std::chrono::steady_clock::now (); + + nano::keypair key; + std::vector hashes = { nano::block_hash{ 1 } }; + + // Initial vote + auto vote1 = nano::test::make_vote (key, hashes, 1000); + auto result1 = index.check_and_record (vote1, nano::uint128_t{ 100 }, now); + ASSERT_EQ (result1, nano::vote_rebroadcaster_index::result::ok); + + // Try rebroadcast immediately - should be rejected + auto vote2 = nano::test::make_vote (key, hashes, 1500); + auto result2 = index.check_and_record (vote2, nano::uint128_t{ 100 }, now); + ASSERT_EQ (result2, nano::vote_rebroadcaster_index::result::rebroadcast_unnecessary); + + // Try after threshold - should be accepted + auto vote3 = nano::test::make_vote (key, hashes, 2500); + auto result3 = index.check_and_record (vote3, nano::uint128_t{ 100 }, now + 2000ms); + ASSERT_EQ (result3, nano::vote_rebroadcaster_index::result::ok); +} + +TEST (vote_rebroadcaster_index, final_vote_override) +{ + test_context ctx{}; + auto & index = ctx.index; + auto now = std::chrono::steady_clock::now (); + + nano::keypair key; + std::vector hashes = { nano::block_hash{ 1 } }; + + // Regular vote + auto vote1 = nano::test::make_vote (key, hashes, 1000); + auto result1 = index.check_and_record (vote1, nano::uint128_t{ 100 }, now); + ASSERT_EQ (result1, nano::vote_rebroadcaster_index::result::ok); + + // Final vote should override timing restrictions + auto final_vote = nano::test::make_final_vote (key, hashes); + auto result2 = index.check_and_record (final_vote, nano::uint128_t{ 100 }, now); + ASSERT_EQ (result2, nano::vote_rebroadcaster_index::result::ok); + + // Both vote should be kept in recent hashes index + ASSERT_EQ (index.total_history (), 1); + ASSERT_EQ (index.total_hashes (), 2); + ASSERT_TRUE (index.contains_block (key.pub, hashes[0])); + ASSERT_TRUE (index.contains_vote (vote1->full_hash ())); + ASSERT_TRUE (index.contains_vote (final_vote->full_hash ())); +} + +TEST (vote_rebroadcaster_index, representative_limit) +{ + nano::vote_rebroadcaster_config config; + config.max_representatives = 2; + test_context ctx{ config }; + auto & index = ctx.index; + auto now = std::chrono::steady_clock::now (); + + std::vector keys (4); + std::vector hashes = { nano::block_hash{ 1 } }; + + // Add first rep (weight 100) + auto vote1 = nano::test::make_vote (keys[0], hashes); + auto result1 = index.check_and_record (vote1, nano::uint128_t{ 100 }, now); + ASSERT_EQ (result1, nano::vote_rebroadcaster_index::result::ok); + ASSERT_EQ (index.representatives_count (), 1); + + // Add second rep (weight 200) + auto vote2 = nano::test::make_vote (keys[1], hashes); + auto result2 = index.check_and_record (vote2, nano::uint128_t{ 200 }, now); + ASSERT_EQ (result2, nano::vote_rebroadcaster_index::result::ok); + ASSERT_EQ (index.representatives_count (), 2); + + // Try to add third rep with lower weight - should be rejected + auto vote3 = nano::test::make_vote (keys[2], hashes); + auto result3 = index.check_and_record (vote3, nano::uint128_t{ 50 }, now); + ASSERT_EQ (result3, nano::vote_rebroadcaster_index::result::representatives_full); + ASSERT_EQ (index.representatives_count (), 2); + + // Add third rep with higher weight - should replace lowest weight + auto vote4 = nano::test::make_vote (keys[3], hashes); + auto result4 = index.check_and_record (vote4, nano::uint128_t{ 300 }, now); + ASSERT_EQ (result4, nano::vote_rebroadcaster_index::result::ok); + ASSERT_FALSE (index.contains_representative (keys[0].pub)); // Lowest weight was removed + ASSERT_EQ (index.representatives_count (), 2); +} + +TEST (vote_rebroadcaster_index, multi_hash_vote) +{ + test_context ctx{}; + auto & index = ctx.index; + auto now = std::chrono::steady_clock::now (); + + nano::keypair key; + std::vector hashes = { + nano::block_hash{ 1 }, + nano::block_hash{ 2 }, + nano::block_hash{ 3 } + }; + + auto vote = nano::test::make_vote (key, hashes); + auto result = index.check_and_record (vote, nano::uint128_t{ 100 }, now); + + ASSERT_EQ (result, nano::vote_rebroadcaster_index::result::ok); + ASSERT_EQ (index.total_history (), 3); // One entry per hash + for (auto const & hash : hashes) + { + ASSERT_TRUE (index.contains_block (key.pub, hash)); + } +} + +TEST (vote_rebroadcaster_index, history_limit) +{ + nano::vote_rebroadcaster_config config; + config.max_history = 2; + test_context ctx{ config }; + auto & index = ctx.index; + auto now = std::chrono::steady_clock::now (); + + nano::keypair key; + + // Add votes up to limit + for (size_t i = 0; i < 3; i++) + { + std::vector hash = { nano::block_hash{ i } }; + auto vote = nano::test::make_vote (key, hash); + index.check_and_record (vote, nano::uint128_t{ 100 }, now); + } + + ASSERT_EQ (index.total_history (), 2); + ASSERT_FALSE (index.contains_block (key.pub, nano::block_hash{ 0 })); // Oldest was removed + ASSERT_TRUE (index.contains_block (key.pub, nano::block_hash{ 1 })); + ASSERT_TRUE (index.contains_block (key.pub, nano::block_hash{ 2 })); +} + +TEST (vote_rebroadcaster_index, cleanup) +{ + test_context ctx{}; + auto & index = ctx.index; + auto now = std::chrono::steady_clock::now (); + + nano::keypair key1; + nano::keypair key2; + std::vector hashes = { nano::block_hash{ 1 } }; + + // Add two reps + auto vote1 = nano::test::make_vote (key1, hashes); + auto vote2 = nano::test::make_vote (key2, hashes); + index.check_and_record (vote1, nano::uint128_t{ 100 }, now); + index.check_and_record (vote2, nano::uint128_t{ 200 }, now); + + // Cleanup with rep1 becoming non-principal + auto cleanup_count = index.cleanup ([&key1] (nano::account const & account) { + return std::make_pair ( + account == key1.pub ? false : true, + account == key1.pub ? nano::uint128_t{ 0 } : nano::uint128_t{ 200 }); + }); + + ASSERT_EQ (cleanup_count, 1); + ASSERT_EQ (index.representatives_count (), 1); + ASSERT_FALSE (index.contains_representative (key1.pub)); + ASSERT_TRUE (index.contains_representative (key2.pub)); +} + +TEST (vote_rebroadcaster_index, weight_updates) +{ + nano::vote_rebroadcaster_config config; + config.max_representatives = 1; + test_context ctx{ config }; + auto & index = ctx.index; + auto now = std::chrono::steady_clock::now (); + + nano::keypair key1; + nano::keypair key2; + std::vector hashes = { nano::block_hash{ 1 } }; + + // Add rep with initial weight + auto vote1 = nano::test::make_vote (key1, hashes); + index.check_and_record (vote1, nano::uint128_t{ 100 }, now); + + // Update weight through cleanup + index.cleanup ([] (nano::account const &) { + return std::make_pair (true, nano::uint128_t{ 200 }); + }); + + // Add new rep with lower weight - should be rejected due to updated weight + auto vote2 = nano::test::make_vote (key2, hashes); + auto result = index.check_and_record (vote2, nano::uint128_t{ 150 }, now); + ASSERT_EQ (result, nano::vote_rebroadcaster_index::result::representatives_full); +} \ No newline at end of file diff --git a/nano/lib/numbers.hpp b/nano/lib/numbers.hpp index 5702413c1..94c7fba55 100644 --- a/nano/lib/numbers.hpp +++ b/nano/lib/numbers.hpp @@ -27,6 +27,8 @@ nano::uint128_t const raw_ratio = nano::uint128_t ("1"); // 10^0 using bucket_index = uint64_t; using priority_timestamp = uint64_t; // Priority within the bucket +using vote_timestamp = uint64_t; + class uint128_union { public: @@ -590,6 +592,8 @@ template <> struct hash<::nano::uint512_union>; template <> struct hash<::nano::qualified_root>; +template <> +struct hash<::nano::signature>; } namespace boost @@ -616,4 +620,6 @@ template <> struct hash<::nano::uint512_union>; template <> struct hash<::nano::qualified_root>; +template <> +struct hash<::nano::signature>; } diff --git a/nano/lib/numbers_templ.hpp b/nano/lib/numbers_templ.hpp index d40de592a..97589980c 100644 --- a/nano/lib/numbers_templ.hpp +++ b/nano/lib/numbers_templ.hpp @@ -94,6 +94,14 @@ struct hash<::nano::qualified_root> return hash<::nano::uint512_union>{}(value); } }; +template <> +struct hash<::nano::signature> +{ + size_t operator() (::nano::signature const & value) const noexcept + { + return hash<::nano::uint512_union>{}(value); + } +}; } namespace boost @@ -186,4 +194,12 @@ struct hash<::nano::qualified_root> return std::hash<::nano::qualified_root> () (value); } }; +template <> +struct hash<::nano::signature> +{ + size_t operator() (::nano::signature const & value) const noexcept + { + return std::hash<::nano::signature> () (value); + } +}; } diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 79f740951..e3d33167f 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -26,6 +26,7 @@ enum class type vote_processor_tier, vote_processor_overfill, vote_rebroadcaster, + vote_rebroadcaster_tier, election, election_cleanup, election_vote, @@ -274,6 +275,15 @@ enum class detail vote_overflow, vote_ignored, + // vote_rebroadcaster + cleanup_tiers, + representatives_full, + representatives_erase_lowest, + representatives_erase_stale, + already_rebroadcasted, + rebroadcast_unnecessary, + rebroadcast_hashes, + // election specific vote_new, vote_processed, @@ -664,9 +674,6 @@ enum class detail pruned_count, collect_targets, - // vote_rebroadcaster - rebroadcast_hashes, - _last // Must be the last enum }; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 206b45b57..fbac8312f 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -247,6 +247,15 @@ void nano::network::send_keepalive_self (std::shared_ptrsend (message, nano::transport::traffic_type::keepalive); } +bool nano::network::check_capacity (nano::transport::traffic_type type, float scale) const +{ + auto const target_count = fanout (scale); + auto channels = list (target_count, [type] (auto const & channel) { + return !channel->max (type); // Only use channels that are not full for this traffic type + }); + return !channels.empty () && channels.size () >= target_count / 2; // We need to have at least half of the target capacity available +} + size_t nano::network::flood_message (nano::message const & message, nano::transport::traffic_type type, float scale) const { auto channels = list (fanout (scale), [type] (auto const & channel) { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 023ec56c9..a49c25ee6 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -95,6 +95,9 @@ public: nano::endpoint endpoint () const; + // Checks if we have enough channel capacity for the given traffic type + bool check_capacity (nano::transport::traffic_type, float scale = 1.0f) const; + size_t flood_message (nano::message const &, nano::transport::traffic_type, float scale = 1.0f) const; size_t flood_keepalive (float scale = 1.0f) const; size_t flood_keepalive_self (float scale = 0.5f) const; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 25cdff33b..97d72f421 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -203,7 +203,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy http_callbacks{ *http_callbacks_impl }, pruning_impl{ std::make_unique (config, flags, ledger, stats, logger) }, pruning{ *pruning_impl }, - vote_rebroadcaster_impl{ std::make_unique (vote_router, network, wallets, stats, logger) }, + vote_rebroadcaster_impl{ std::make_unique (config.vote_rebroadcaster, ledger, vote_router, network, wallets, rep_tiers, stats, logger) }, vote_rebroadcaster{ *vote_rebroadcaster_impl }, startup_time{ std::chrono::steady_clock::now () }, node_seq{ seq } diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index a7d8e29a9..22dcb9b4d 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -162,6 +163,7 @@ public: nano::monitor_config monitor; nano::backlog_scan_config backlog_scan; nano::bounded_backlog_config bounded_backlog; + nano::vote_rebroadcaster_config vote_rebroadcaster; public: /** Entry is ignored if it cannot be parsed as a valid address:port */ diff --git a/nano/node/vote_rebroadcaster.cpp b/nano/node/vote_rebroadcaster.cpp index 785d0fbf4..ae041ed8d 100644 --- a/nano/node/vote_rebroadcaster.cpp +++ b/nano/node/vote_rebroadcaster.cpp @@ -1,27 +1,76 @@ #include #include +#include #include #include +#include #include #include #include #include +#include #include -nano::vote_rebroadcaster::vote_rebroadcaster (nano::vote_router & vote_router_a, nano::network & network_a, nano::wallets & wallets_a, nano::stats & stats_a, nano::logger & logger_a) : +nano::vote_rebroadcaster::vote_rebroadcaster (nano::vote_rebroadcaster_config const & config_a, nano::ledger & ledger_a, nano::vote_router & vote_router_a, nano::network & network_a, nano::wallets & wallets_a, nano::rep_tiers & rep_tiers_a, nano::stats & stats_a, nano::logger & logger_a) : + config{ config_a }, + ledger{ ledger_a }, vote_router{ vote_router_a }, network{ network_a }, wallets{ wallets_a }, + rep_tiers{ rep_tiers_a }, stats{ stats_a }, - logger{ logger_a } + logger{ logger_a }, + rebroadcasts{ config } { + if (!config.enable) + { + return; + } + + queue.max_size_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::rep_tier::tier_3: + case nano::rep_tier::tier_2: + case nano::rep_tier::tier_1: + return config.max_queue; + case nano::rep_tier::none: + return size_t{ 0 }; + } + debug_assert (false); + return size_t{ 0 }; + }; + + queue.priority_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::rep_tier::tier_3: + return config.priority_coefficient * config.priority_coefficient * config.priority_coefficient; + case nano::rep_tier::tier_2: + return config.priority_coefficient * config.priority_coefficient; + case nano::rep_tier::tier_1: + return config.priority_coefficient; + case nano::rep_tier::none: + return size_t{ 0 }; + } + debug_assert (false); + return size_t{ 0 }; + }; + vote_router.vote_processed.add ([this] (std::shared_ptr const & vote, nano::vote_source source, std::unordered_map const & results) { bool processed = std::any_of (results.begin (), results.end (), [] (auto const & result) { return result.second == nano::vote_code::vote; }); - if (processed && enable) + + // Enable vote rebroadcasting only if the node does not host a representative + // Do not rebroadcast votes from non-principal representatives + if (processed && non_principal) { - put (vote); + auto tier = rep_tiers.tier (vote->account); + if (tier != nano::rep_tier::none) + { + push (vote, tier); + } } }); } @@ -35,6 +84,11 @@ void nano::vote_rebroadcaster::start () { debug_assert (!thread.joinable ()); + if (!config.enable) + { + return; + } + thread = std::thread ([this] () { nano::thread_role::set (nano::thread_role::name::vote_rebroadcasting); run (); @@ -54,17 +108,19 @@ void nano::vote_rebroadcaster::stop () } } -bool nano::vote_rebroadcaster::put (std::shared_ptr const & vote) +bool nano::vote_rebroadcaster::push (std::shared_ptr const & vote, nano::rep_tier tier) { - bool added{ false }; + bool added = false; { std::lock_guard guard{ mutex }; - if (queue.size () < max_queue) + + // Do not rebroadcast local representative votes + if (!reps.exists (vote->account) && !queue_hashes.contains (vote->signature)) { - if (!reps.exists (vote->account)) + added = queue.push (vote, tier); + if (added) { - queue.push_back (vote); - added = true; + queue_hashes.insert (vote->signature); // Keep track of vote signatures to avoid duplicates } } } @@ -73,13 +129,26 @@ bool nano::vote_rebroadcaster::put (std::shared_ptr const & vote) stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::queued); condition.notify_one (); } - else - { - stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::overfill); - } return added; } +std::pair, nano::rep_tier> nano::vote_rebroadcaster::next () +{ + debug_assert (!mutex.try_lock ()); + debug_assert (!queue.empty ()); + + queue.periodic_update (); + + auto [vote, origin] = queue.next (); + release_assert (vote != nullptr); + release_assert (origin.source != nano::rep_tier::none); + + auto erased = queue_hashes.erase (vote->signature); + debug_assert (erased == 1); + + return { vote, origin.source }; +} + void nano::vote_rebroadcaster::run () { std::unique_lock lock{ mutex }; @@ -96,37 +165,285 @@ void nano::vote_rebroadcaster::run () stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::loop); - if (refresh_interval.elapse (15s)) + // Update local reps cache + if (refresh_interval.elapse (nano::is_dev_run () ? 1s : 15s)) { stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::refresh); reps = wallets.reps (); - enable = !reps.have_half_rep (); // Disable vote rebroadcasting if the node has a principal representative (or close to) + non_principal = !reps.have_half_rep (); // Disable vote rebroadcasting if the node has a principal representative (or close to) + } + + // Cleanup expired representatives from rebroadcasts + if (cleanup_interval.elapse (nano::is_dev_run () ? 1s : 60s)) + { + lock.unlock (); + cleanup (); + lock.lock (); + } + + float constexpr network_fanout_scale = 1.0f; + + // Wait for spare if our network traffic is too high + if (!network.check_capacity (nano::transport::traffic_type::vote_rebroadcast, network_fanout_scale)) + { + stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::cooldown); + lock.unlock (); + std::this_thread::sleep_for (100ms); + lock.lock (); + continue; // Wait for more capacity } if (!queue.empty ()) { - auto vote = queue.front (); - queue.pop_front (); + auto [vote, tier] = next (); lock.unlock (); - stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast); - stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast_hashes, vote->hashes.size ()); + bool should_rebroadcast = process (vote); + if (should_rebroadcast) + { + stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast); + stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast_hashes, vote->hashes.size ()); + stats.inc (nano::stat::type::vote_rebroadcaster_tier, to_stat_detail (tier)); - auto sent = network.flood_vote_rebroadcasted (vote, 0.5f); - stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::sent, sent); + auto sent = network.flood_vote_rebroadcasted (vote, network_fanout_scale); + stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::sent, sent); + } lock.lock (); } } } +bool nano::vote_rebroadcaster::process (std::shared_ptr const & vote) +{ + stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::process); + + auto rebroadcasts_l = rebroadcasts.lock (); + + auto result = rebroadcasts_l->check_and_record (vote, ledger.weight (vote->account), std::chrono::steady_clock::now ()); + if (result == nano::vote_rebroadcaster_index::result::ok) + { + return true; // Vote qualifies for rebroadcast + } + else + { + stats.inc (nano::stat::type::vote_rebroadcaster, nano::enum_util::cast (result)); + return false; // Vote does not qualify for rebroadcast + } +} + +void nano::vote_rebroadcaster::cleanup () +{ + stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::cleanup); + + auto rebroadcasts_l = rebroadcasts.lock (); + + auto erased_reps = rebroadcasts_l->cleanup ([this] (auto const & rep) { + auto tier = rep_tiers.tier (rep); + auto weight = ledger.weight (rep); + return std::make_pair (tier != nano::rep_tier::none /* keep entry only if principal rep */, weight); + }); + + stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::representatives_erase_stale, erased_reps); +} + nano::container_info nano::vote_rebroadcaster::container_info () const { std::lock_guard guard{ mutex }; + auto rebroadcasts_l = rebroadcasts.lock (); + nano::container_info info; - info.put ("queue", queue.size ()); + info.add ("queue", queue.container_info ()); + info.put ("queue_total", queue.size ()); + info.put ("queue_hashes", queue_hashes.size ()); + info.put ("representatives", rebroadcasts_l->representatives_count ()); + info.put ("history", rebroadcasts_l->total_history ()); + info.put ("hashes", rebroadcasts_l->total_hashes ()); return info; +} + +/* + * vote_rebroadcaster_index + */ + +nano::vote_rebroadcaster_index::vote_rebroadcaster_index (nano::vote_rebroadcaster_config const & config_a) : + config{ config_a } +{ +} + +nano::vote_rebroadcaster_index::result nano::vote_rebroadcaster_index::check_and_record (std::shared_ptr const & vote, nano::uint128_t rep_weight, std::chrono::steady_clock::time_point now) +{ + auto const vote_timestamp = vote->timestamp (); + auto const vote_hash = vote->full_hash (); + + auto it = index.get ().find (vote->account); + + // If we don't have a record for this rep, add it + if (it == index.get ().end ()) + { + auto should_add = [&, this] () { + // Under normal conditions the number of principal representatives should be below this limit + if (index.size () < config.max_representatives) + { + return true; + } + // However, if we're at capacity, we can still add the rep if it has a higher weight than the lowest weight in the container + if (auto lowest = index.get ().begin (); lowest != index.get ().end ()) + { + return rep_weight > lowest->weight; + } + return false; + }; + + if (should_add ()) + { + it = index.get ().emplace (representative_entry{ vote->account, rep_weight }).first; + } + else + { + return result::representatives_full; + } + } + release_assert (it != index.get ().end ()); + + auto & history = it->history; + auto & hashes = it->hashes; + + // Check if we already rebroadcasted this exact vote (fast lookup by hash) + if (hashes.get ().contains (vote_hash)) + { + return result::already_rebroadcasted; + } + + // Check if any of the hashes contained in the vote qualifies for rebroadcasting + auto check_hash = [&] (auto const & hash) { + if (auto existing = history.get ().find (hash); existing != history.get ().end ()) + { + // Always rebroadcast vote if rep switched to a final vote + if (nano::vote::is_final_timestamp (vote_timestamp) && vote_timestamp > existing->vote_timestamp) + { + return true; + } + // Otherwise only rebroadcast if sufficient time has passed since the last rebroadcast + if (existing->timestamp + config.rebroadcast_threshold > now) + { + return false; // Not enough (as seen by local clock) time has passed + } + if (add_sat (existing->vote_timestamp, static_cast (config.rebroadcast_threshold.count ())) > vote_timestamp) + { + return false; // Not enough (as seen by vote timestamp) time has passed + } + return true; // Enough time has passed, block hash qualifies for rebroadcast + } + else + { + return true; // Block hash not seen before, rebroadcast + } + }; + + bool should_rebroadcast = std::any_of (vote->hashes.begin (), vote->hashes.end (), check_hash); + if (!should_rebroadcast) + { + return result::rebroadcast_unnecessary; + } + + // Update the history with the new vote info + for (auto const & hash : vote->hashes) + { + if (auto existing = history.get ().find (hash); existing != history.get ().end ()) + { + history.get ().modify (existing, [&] (auto & entry) { + entry.vote_timestamp = vote_timestamp; + entry.timestamp = now; + }); + } + else + { + history.get ().emplace (rebroadcast_entry{ hash, vote_timestamp, now }); + } + } + + // Also keep track of the vote hash to quickly filter out duplicates + hashes.push_back (vote_hash); + + // Keep history and hashes sizes within limits, erase oldest entries + while (history.size () > config.max_history) + { + history.pop_front (); // Remove the oldest entry + } + while (hashes.size () > config.max_history) + { + hashes.pop_front (); // Remove the oldest entry + } + + // Keep representatives index within limits, erase lowest weight entries + while (!index.empty () && index.size () > config.max_representatives) + { + index.get ().erase (index.get ().begin ()); + } + + return result::ok; // Rebroadcast the vote +} + +size_t nano::vote_rebroadcaster_index::cleanup (rep_query query) +{ + // Remove entries for accounts that are no longer principal representatives + auto erased_reps = erase_if (index, [&] (auto const & entry) { + auto [should_keep, weight] = query (entry.representative); + return !should_keep; + }); + + // Update representative weights + for (auto it = index.begin (), end = index.end (); it != end; ++it) + { + index.modify (it, [&] (auto & entry) { + auto [tier, weight] = query (entry.representative); + entry.weight = weight; + }); + } + + return erased_reps; +} + +bool nano::vote_rebroadcaster_index::contains_vote (nano::block_hash const & vote_hash) const +{ + return std::any_of (index.begin (), index.end (), [&] (auto const & entry) { + return entry.hashes.template get ().contains (vote_hash); + }); +} + +bool nano::vote_rebroadcaster_index::contains_representative (nano::account const & representative) const +{ + return index.get ().contains (representative); +} + +bool nano::vote_rebroadcaster_index::contains_block (nano::account const & representative, nano::block_hash const & block_hash) const +{ + if (auto it = index.get ().find (representative); it != index.get ().end ()) + { + return it->history.get ().find (block_hash) != it->history.get ().end (); + } + return false; +} + +size_t nano::vote_rebroadcaster_index::representatives_count () const +{ + return index.size (); +} + +size_t nano::vote_rebroadcaster_index::total_history () const +{ + return std::accumulate (index.begin (), index.end (), size_t{ 0 }, [] (auto total, auto const & entry) { + return total + entry.history.size (); + }); +} + +size_t nano::vote_rebroadcaster_index::total_hashes () const +{ + return std::accumulate (index.begin (), index.end (), size_t{ 0 }, [] (auto total, auto const & entry) { + return total + entry.hashes.size (); + }); } \ No newline at end of file diff --git a/nano/node/vote_rebroadcaster.hpp b/nano/node/vote_rebroadcaster.hpp index 20513e0a1..548093af1 100644 --- a/nano/node/vote_rebroadcaster.hpp +++ b/nano/node/vote_rebroadcaster.hpp @@ -1,45 +1,169 @@ #pragma once +#include +#include +#include #include #include +#include + +#include +#include +#include +#include +#include +#include +#include #include #include #include #include +#include + +namespace mi = boost::multi_index; namespace nano { +class vote_rebroadcaster_config final +{ +public: + // TODO: Serde + +public: + bool enable{ true }; + size_t max_queue{ 1024 * 4 }; // Maximum number of votes to keep in queue for processing + size_t max_history{ 1024 * 32 }; // Maximum number of recently broadcast hashes to keep per representative + size_t max_representatives{ 100 }; // Maximum number of representatives to track rebroadcasts for + std::chrono::milliseconds rebroadcast_threshold{ 1000 * 90 }; // Minimum amount of time between rebroadcasts for the same hash from the same representative (milliseconds) + size_t priority_coefficient{ 2 }; // Priority coefficient for prioritizing votes from representative tiers +}; + +class vote_rebroadcaster_index +{ +public: + explicit vote_rebroadcaster_index (vote_rebroadcaster_config const &); + + enum class result + { + ok, + already_rebroadcasted, + representatives_full, + rebroadcast_unnecessary, + }; + + result check_and_record (std::shared_ptr const & vote, nano::uint128_t rep_weight, std::chrono::steady_clock::time_point now); + + using rep_query = std::function (nano::account const &)>; // Returns + size_t cleanup (rep_query); + + bool contains_vote (nano::block_hash const & vote_hash) const; + bool contains_representative (nano::account const & representative) const; + bool contains_block (nano::account const & representative, nano::block_hash const & block_hash) const; + + size_t representatives_count () const; + size_t total_history () const; + size_t total_hashes () const; + +private: + vote_rebroadcaster_config const & config; + + struct rebroadcast_entry + { + nano::block_hash block_hash; + nano::vote_timestamp vote_timestamp; + std::chrono::steady_clock::time_point timestamp; + }; + + // clang-format off + class tag_sequenced {}; + class tag_vote_hash {}; + class tag_block_hash {}; + + // Tracks rebroadcast history for individual block hashes + using ordered_rebroadcasts = boost::multi_index_container>, + mi::hashed_unique, + mi::member> + >>; + + // Tracks rebroadcast history for full votes + using ordered_hashes = boost::multi_index_container>, + mi::hashed_unique, + mi::identity> + >>; + // clang-format on + + struct representative_entry + { + nano::account representative; + nano::uint128_t weight; + + mutable ordered_rebroadcasts history; + mutable ordered_hashes hashes; + }; + + // clang-format off + class tag_account {}; + class tag_weight {}; + + using ordered_representatives = boost::multi_index_container>, + mi::hashed_unique, + mi::member>, + mi::ordered_non_unique, + mi::member> + >>; + // clang-format on + + ordered_representatives index; +}; + class vote_rebroadcaster final { public: - static size_t constexpr max_queue = 1024 * 16; - -public: - vote_rebroadcaster (nano::vote_router &, nano::network &, nano::wallets &, nano::stats &, nano::logger &); + vote_rebroadcaster (vote_rebroadcaster_config const &, nano::ledger &, nano::vote_router &, nano::network &, nano::wallets &, nano::rep_tiers &, nano::stats &, nano::logger &); ~vote_rebroadcaster (); void start (); void stop (); - bool put (std::shared_ptr const &); + bool push (std::shared_ptr const &, nano::rep_tier); nano::container_info container_info () const; public: // Dependencies + vote_rebroadcaster_config const & config; + nano::ledger & ledger; nano::vote_router & vote_router; nano::network & network; nano::wallets & wallets; + nano::rep_tiers & rep_tiers; nano::stats & stats; nano::logger & logger; private: void run (); + void cleanup (); + bool process (std::shared_ptr const &); + std::pair, nano::rep_tier> next (); - std::atomic enable{ true }; // Enable vote rebroadcasting only if the node does not host a representative - std::deque> queue; +private: + // Queue of recently processed votes to potentially rebroadcast + nano::fair_queue, nano::rep_tier> queue; + std::unordered_set queue_hashes; // Avoids queuing the same vote multiple times + + nano::locked rebroadcasts; + +private: + std::atomic non_principal{ true }; nano::wallet_representatives reps; nano::interval refresh_interval; + nano::interval cleanup_interval; bool stopped{ false }; std::condition_variable condition; diff --git a/nano/secure/vote.hpp b/nano/secure/vote.hpp index 7da9b3278..96969896a 100644 --- a/nano/secure/vote.hpp +++ b/nano/secure/vote.hpp @@ -63,7 +63,7 @@ public: // Payload nano::signature signature{ 0 }; private: // Payload - // Vote timestamp + // Vote timestamp (milliseconds since epoch) uint64_t timestamp_m{ 0 }; private: