Overhaul vote rebroadcaster

This commit is contained in:
Piotr Wójcik 2025-02-06 12:26:25 +01:00
commit 5b3bfe4ae6
13 changed files with 798 additions and 37 deletions

View file

@ -70,6 +70,7 @@ add_executable(
utility.cpp
vote_cache.cpp
vote_processor.cpp
vote_rebroadcaster.cpp
voting.cpp
wallet.cpp
wallets.cpp

View file

@ -35,8 +35,11 @@ using namespace std::chrono_literals;
*/
TEST (active_elections, confirm_election_by_request)
{
nano::test::system system{};
auto & node1 = *system.add_node ();
nano::test::system system;
nano::node_config node_config1;
// Disable vote rebroadcasting to prevent node1 from actively sending votes to node2
node_config1.vote_rebroadcaster.enable = false;
auto & node1 = *system.add_node (node_config1);
nano::state_block_builder builder{};
auto send1 = builder

View file

@ -0,0 +1,273 @@
#include <nano/lib/numbers.hpp>
#include <nano/node/rep_tiers.hpp>
#include <nano/node/vote_rebroadcaster.hpp>
#include <nano/secure/vote.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
#include <chrono>
using namespace std::chrono_literals;
namespace
{
struct test_context
{
nano::vote_rebroadcaster_config config;
nano::vote_rebroadcaster_index index;
explicit test_context (nano::vote_rebroadcaster_config config_a = {}) :
config{ config_a },
index{ config }
{
}
};
}
TEST (vote_rebroadcaster_index, construction)
{
test_context ctx{};
auto & index = ctx.index;
ASSERT_EQ (index.representatives_count (), 0);
ASSERT_EQ (index.total_history (), 0);
ASSERT_EQ (index.total_hashes (), 0);
}
TEST (vote_rebroadcaster_index, basic_vote_tracking)
{
test_context ctx{};
auto & index = ctx.index;
auto now = std::chrono::steady_clock::now ();
nano::keypair key;
std::vector<nano::block_hash> hashes = { nano::block_hash{ 1 } };
auto vote = nano::test::make_vote (key, hashes);
auto result = index.check_and_record (vote, nano::uint128_t{ 100 }, now);
ASSERT_EQ (result, nano::vote_rebroadcaster_index::result::ok);
ASSERT_EQ (index.representatives_count (), 1);
ASSERT_EQ (index.total_history (), 1);
ASSERT_EQ (index.total_hashes (), 1);
ASSERT_TRUE (index.contains_representative (key.pub));
ASSERT_TRUE (index.contains_block (key.pub, hashes[0]));
}
TEST (vote_rebroadcaster_index, duplicate_vote_rejection)
{
test_context ctx{};
auto & index = ctx.index;
auto now = std::chrono::steady_clock::now ();
nano::keypair key;
std::vector<nano::block_hash> hashes = { nano::block_hash{ 1 } };
auto vote = nano::test::make_vote (key, hashes);
// First vote should be accepted
auto result1 = index.check_and_record (vote, nano::uint128_t{ 100 }, now);
ASSERT_EQ (result1, nano::vote_rebroadcaster_index::result::ok);
// Same vote should be rejected
auto result2 = index.check_and_record (vote, nano::uint128_t{ 100 }, now);
ASSERT_EQ (result2, nano::vote_rebroadcaster_index::result::already_rebroadcasted);
// Even after time threshold
auto result3 = index.check_and_record (vote, nano::uint128_t{ 100 }, now + 60min);
ASSERT_EQ (result3, nano::vote_rebroadcaster_index::result::already_rebroadcasted);
}
TEST (vote_rebroadcaster_index, rebroadcast_timing)
{
nano::vote_rebroadcaster_config config;
config.rebroadcast_threshold = 1000ms;
test_context ctx{ config };
auto & index = ctx.index;
auto now = std::chrono::steady_clock::now ();
nano::keypair key;
std::vector<nano::block_hash> hashes = { nano::block_hash{ 1 } };
// Initial vote
auto vote1 = nano::test::make_vote (key, hashes, 1000);
auto result1 = index.check_and_record (vote1, nano::uint128_t{ 100 }, now);
ASSERT_EQ (result1, nano::vote_rebroadcaster_index::result::ok);
// Try rebroadcast immediately - should be rejected
auto vote2 = nano::test::make_vote (key, hashes, 1500);
auto result2 = index.check_and_record (vote2, nano::uint128_t{ 100 }, now);
ASSERT_EQ (result2, nano::vote_rebroadcaster_index::result::rebroadcast_unnecessary);
// Try after threshold - should be accepted
auto vote3 = nano::test::make_vote (key, hashes, 2500);
auto result3 = index.check_and_record (vote3, nano::uint128_t{ 100 }, now + 2000ms);
ASSERT_EQ (result3, nano::vote_rebroadcaster_index::result::ok);
}
TEST (vote_rebroadcaster_index, final_vote_override)
{
test_context ctx{};
auto & index = ctx.index;
auto now = std::chrono::steady_clock::now ();
nano::keypair key;
std::vector<nano::block_hash> hashes = { nano::block_hash{ 1 } };
// Regular vote
auto vote1 = nano::test::make_vote (key, hashes, 1000);
auto result1 = index.check_and_record (vote1, nano::uint128_t{ 100 }, now);
ASSERT_EQ (result1, nano::vote_rebroadcaster_index::result::ok);
// Final vote should override timing restrictions
auto final_vote = nano::test::make_final_vote (key, hashes);
auto result2 = index.check_and_record (final_vote, nano::uint128_t{ 100 }, now);
ASSERT_EQ (result2, nano::vote_rebroadcaster_index::result::ok);
// Both vote should be kept in recent hashes index
ASSERT_EQ (index.total_history (), 1);
ASSERT_EQ (index.total_hashes (), 2);
ASSERT_TRUE (index.contains_block (key.pub, hashes[0]));
ASSERT_TRUE (index.contains_vote (vote1->full_hash ()));
ASSERT_TRUE (index.contains_vote (final_vote->full_hash ()));
}
TEST (vote_rebroadcaster_index, representative_limit)
{
nano::vote_rebroadcaster_config config;
config.max_representatives = 2;
test_context ctx{ config };
auto & index = ctx.index;
auto now = std::chrono::steady_clock::now ();
std::vector<nano::keypair> keys (4);
std::vector<nano::block_hash> hashes = { nano::block_hash{ 1 } };
// Add first rep (weight 100)
auto vote1 = nano::test::make_vote (keys[0], hashes);
auto result1 = index.check_and_record (vote1, nano::uint128_t{ 100 }, now);
ASSERT_EQ (result1, nano::vote_rebroadcaster_index::result::ok);
ASSERT_EQ (index.representatives_count (), 1);
// Add second rep (weight 200)
auto vote2 = nano::test::make_vote (keys[1], hashes);
auto result2 = index.check_and_record (vote2, nano::uint128_t{ 200 }, now);
ASSERT_EQ (result2, nano::vote_rebroadcaster_index::result::ok);
ASSERT_EQ (index.representatives_count (), 2);
// Try to add third rep with lower weight - should be rejected
auto vote3 = nano::test::make_vote (keys[2], hashes);
auto result3 = index.check_and_record (vote3, nano::uint128_t{ 50 }, now);
ASSERT_EQ (result3, nano::vote_rebroadcaster_index::result::representatives_full);
ASSERT_EQ (index.representatives_count (), 2);
// Add third rep with higher weight - should replace lowest weight
auto vote4 = nano::test::make_vote (keys[3], hashes);
auto result4 = index.check_and_record (vote4, nano::uint128_t{ 300 }, now);
ASSERT_EQ (result4, nano::vote_rebroadcaster_index::result::ok);
ASSERT_FALSE (index.contains_representative (keys[0].pub)); // Lowest weight was removed
ASSERT_EQ (index.representatives_count (), 2);
}
TEST (vote_rebroadcaster_index, multi_hash_vote)
{
test_context ctx{};
auto & index = ctx.index;
auto now = std::chrono::steady_clock::now ();
nano::keypair key;
std::vector<nano::block_hash> hashes = {
nano::block_hash{ 1 },
nano::block_hash{ 2 },
nano::block_hash{ 3 }
};
auto vote = nano::test::make_vote (key, hashes);
auto result = index.check_and_record (vote, nano::uint128_t{ 100 }, now);
ASSERT_EQ (result, nano::vote_rebroadcaster_index::result::ok);
ASSERT_EQ (index.total_history (), 3); // One entry per hash
for (auto const & hash : hashes)
{
ASSERT_TRUE (index.contains_block (key.pub, hash));
}
}
TEST (vote_rebroadcaster_index, history_limit)
{
nano::vote_rebroadcaster_config config;
config.max_history = 2;
test_context ctx{ config };
auto & index = ctx.index;
auto now = std::chrono::steady_clock::now ();
nano::keypair key;
// Add votes up to limit
for (size_t i = 0; i < 3; i++)
{
std::vector<nano::block_hash> hash = { nano::block_hash{ i } };
auto vote = nano::test::make_vote (key, hash);
index.check_and_record (vote, nano::uint128_t{ 100 }, now);
}
ASSERT_EQ (index.total_history (), 2);
ASSERT_FALSE (index.contains_block (key.pub, nano::block_hash{ 0 })); // Oldest was removed
ASSERT_TRUE (index.contains_block (key.pub, nano::block_hash{ 1 }));
ASSERT_TRUE (index.contains_block (key.pub, nano::block_hash{ 2 }));
}
TEST (vote_rebroadcaster_index, cleanup)
{
test_context ctx{};
auto & index = ctx.index;
auto now = std::chrono::steady_clock::now ();
nano::keypair key1;
nano::keypair key2;
std::vector<nano::block_hash> hashes = { nano::block_hash{ 1 } };
// Add two reps
auto vote1 = nano::test::make_vote (key1, hashes);
auto vote2 = nano::test::make_vote (key2, hashes);
index.check_and_record (vote1, nano::uint128_t{ 100 }, now);
index.check_and_record (vote2, nano::uint128_t{ 200 }, now);
// Cleanup with rep1 becoming non-principal
auto cleanup_count = index.cleanup ([&key1] (nano::account const & account) {
return std::make_pair (
account == key1.pub ? false : true,
account == key1.pub ? nano::uint128_t{ 0 } : nano::uint128_t{ 200 });
});
ASSERT_EQ (cleanup_count, 1);
ASSERT_EQ (index.representatives_count (), 1);
ASSERT_FALSE (index.contains_representative (key1.pub));
ASSERT_TRUE (index.contains_representative (key2.pub));
}
TEST (vote_rebroadcaster_index, weight_updates)
{
nano::vote_rebroadcaster_config config;
config.max_representatives = 1;
test_context ctx{ config };
auto & index = ctx.index;
auto now = std::chrono::steady_clock::now ();
nano::keypair key1;
nano::keypair key2;
std::vector<nano::block_hash> hashes = { nano::block_hash{ 1 } };
// Add rep with initial weight
auto vote1 = nano::test::make_vote (key1, hashes);
index.check_and_record (vote1, nano::uint128_t{ 100 }, now);
// Update weight through cleanup
index.cleanup ([] (nano::account const &) {
return std::make_pair (true, nano::uint128_t{ 200 });
});
// Add new rep with lower weight - should be rejected due to updated weight
auto vote2 = nano::test::make_vote (key2, hashes);
auto result = index.check_and_record (vote2, nano::uint128_t{ 150 }, now);
ASSERT_EQ (result, nano::vote_rebroadcaster_index::result::representatives_full);
}

View file

@ -27,6 +27,8 @@ nano::uint128_t const raw_ratio = nano::uint128_t ("1"); // 10^0
using bucket_index = uint64_t;
using priority_timestamp = uint64_t; // Priority within the bucket
using vote_timestamp = uint64_t;
class uint128_union
{
public:
@ -590,6 +592,8 @@ template <>
struct hash<::nano::uint512_union>;
template <>
struct hash<::nano::qualified_root>;
template <>
struct hash<::nano::signature>;
}
namespace boost
@ -616,4 +620,6 @@ template <>
struct hash<::nano::uint512_union>;
template <>
struct hash<::nano::qualified_root>;
template <>
struct hash<::nano::signature>;
}

View file

@ -94,6 +94,14 @@ struct hash<::nano::qualified_root>
return hash<::nano::uint512_union>{}(value);
}
};
template <>
struct hash<::nano::signature>
{
size_t operator() (::nano::signature const & value) const noexcept
{
return hash<::nano::uint512_union>{}(value);
}
};
}
namespace boost
@ -186,4 +194,12 @@ struct hash<::nano::qualified_root>
return std::hash<::nano::qualified_root> () (value);
}
};
template <>
struct hash<::nano::signature>
{
size_t operator() (::nano::signature const & value) const noexcept
{
return std::hash<::nano::signature> () (value);
}
};
}

View file

@ -26,6 +26,7 @@ enum class type
vote_processor_tier,
vote_processor_overfill,
vote_rebroadcaster,
vote_rebroadcaster_tier,
election,
election_cleanup,
election_vote,
@ -274,6 +275,15 @@ enum class detail
vote_overflow,
vote_ignored,
// vote_rebroadcaster
cleanup_tiers,
representatives_full,
representatives_erase_lowest,
representatives_erase_stale,
already_rebroadcasted,
rebroadcast_unnecessary,
rebroadcast_hashes,
// election specific
vote_new,
vote_processed,
@ -664,9 +674,6 @@ enum class detail
pruned_count,
collect_targets,
// vote_rebroadcaster
rebroadcast_hashes,
_last // Must be the last enum
};

View file

@ -247,6 +247,15 @@ void nano::network::send_keepalive_self (std::shared_ptr<nano::transport::channe
channel->send (message, nano::transport::traffic_type::keepalive);
}
bool nano::network::check_capacity (nano::transport::traffic_type type, float scale) const
{
auto const target_count = fanout (scale);
auto channels = list (target_count, [type] (auto const & channel) {
return !channel->max (type); // Only use channels that are not full for this traffic type
});
return !channels.empty () && channels.size () >= target_count / 2; // We need to have at least half of the target capacity available
}
size_t nano::network::flood_message (nano::message const & message, nano::transport::traffic_type type, float scale) const
{
auto channels = list (fanout (scale), [type] (auto const & channel) {

View file

@ -95,6 +95,9 @@ public:
nano::endpoint endpoint () const;
// Checks if we have enough channel capacity for the given traffic type
bool check_capacity (nano::transport::traffic_type, float scale = 1.0f) const;
size_t flood_message (nano::message const &, nano::transport::traffic_type, float scale = 1.0f) const;
size_t flood_keepalive (float scale = 1.0f) const;
size_t flood_keepalive_self (float scale = 0.5f) const;

View file

@ -203,7 +203,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
http_callbacks{ *http_callbacks_impl },
pruning_impl{ std::make_unique<nano::pruning> (config, flags, ledger, stats, logger) },
pruning{ *pruning_impl },
vote_rebroadcaster_impl{ std::make_unique<nano::vote_rebroadcaster> (vote_router, network, wallets, stats, logger) },
vote_rebroadcaster_impl{ std::make_unique<nano::vote_rebroadcaster> (config.vote_rebroadcaster, ledger, vote_router, network, wallets, rep_tiers, stats, logger) },
vote_rebroadcaster{ *vote_rebroadcaster_impl },
startup_time{ std::chrono::steady_clock::now () },
node_seq{ seq }

View file

@ -31,6 +31,7 @@
#include <nano/node/transport/tcp_listener.hpp>
#include <nano/node/vote_cache.hpp>
#include <nano/node/vote_processor.hpp>
#include <nano/node/vote_rebroadcaster.hpp>
#include <nano/node/websocketconfig.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/generate_cache_flags.hpp>
@ -162,6 +163,7 @@ public:
nano::monitor_config monitor;
nano::backlog_scan_config backlog_scan;
nano::bounded_backlog_config bounded_backlog;
nano::vote_rebroadcaster_config vote_rebroadcaster;
public:
/** Entry is ignored if it cannot be parsed as a valid address:port */

View file

@ -1,27 +1,76 @@
#include <nano/lib/assert.hpp>
#include <nano/lib/interval.hpp>
#include <nano/lib/numbers_templ.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/node/network.hpp>
#include <nano/node/rep_tiers.hpp>
#include <nano/node/vote_processor.hpp>
#include <nano/node/vote_rebroadcaster.hpp>
#include <nano/node/vote_router.hpp>
#include <nano/node/wallet.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/vote.hpp>
nano::vote_rebroadcaster::vote_rebroadcaster (nano::vote_router & vote_router_a, nano::network & network_a, nano::wallets & wallets_a, nano::stats & stats_a, nano::logger & logger_a) :
nano::vote_rebroadcaster::vote_rebroadcaster (nano::vote_rebroadcaster_config const & config_a, nano::ledger & ledger_a, nano::vote_router & vote_router_a, nano::network & network_a, nano::wallets & wallets_a, nano::rep_tiers & rep_tiers_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
ledger{ ledger_a },
vote_router{ vote_router_a },
network{ network_a },
wallets{ wallets_a },
rep_tiers{ rep_tiers_a },
stats{ stats_a },
logger{ logger_a }
logger{ logger_a },
rebroadcasts{ config }
{
if (!config.enable)
{
return;
}
queue.max_size_query = [this] (auto const & origin) {
switch (origin.source)
{
case nano::rep_tier::tier_3:
case nano::rep_tier::tier_2:
case nano::rep_tier::tier_1:
return config.max_queue;
case nano::rep_tier::none:
return size_t{ 0 };
}
debug_assert (false);
return size_t{ 0 };
};
queue.priority_query = [this] (auto const & origin) {
switch (origin.source)
{
case nano::rep_tier::tier_3:
return config.priority_coefficient * config.priority_coefficient * config.priority_coefficient;
case nano::rep_tier::tier_2:
return config.priority_coefficient * config.priority_coefficient;
case nano::rep_tier::tier_1:
return config.priority_coefficient;
case nano::rep_tier::none:
return size_t{ 0 };
}
debug_assert (false);
return size_t{ 0 };
};
vote_router.vote_processed.add ([this] (std::shared_ptr<nano::vote> const & vote, nano::vote_source source, std::unordered_map<nano::block_hash, nano::vote_code> const & results) {
bool processed = std::any_of (results.begin (), results.end (), [] (auto const & result) {
return result.second == nano::vote_code::vote;
});
if (processed && enable)
// Enable vote rebroadcasting only if the node does not host a representative
// Do not rebroadcast votes from non-principal representatives
if (processed && non_principal)
{
put (vote);
auto tier = rep_tiers.tier (vote->account);
if (tier != nano::rep_tier::none)
{
push (vote, tier);
}
}
});
}
@ -35,6 +84,11 @@ void nano::vote_rebroadcaster::start ()
{
debug_assert (!thread.joinable ());
if (!config.enable)
{
return;
}
thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::vote_rebroadcasting);
run ();
@ -54,17 +108,19 @@ void nano::vote_rebroadcaster::stop ()
}
}
bool nano::vote_rebroadcaster::put (std::shared_ptr<nano::vote> const & vote)
bool nano::vote_rebroadcaster::push (std::shared_ptr<nano::vote> const & vote, nano::rep_tier tier)
{
bool added{ false };
bool added = false;
{
std::lock_guard guard{ mutex };
if (queue.size () < max_queue)
// Do not rebroadcast local representative votes
if (!reps.exists (vote->account) && !queue_hashes.contains (vote->signature))
{
if (!reps.exists (vote->account))
added = queue.push (vote, tier);
if (added)
{
queue.push_back (vote);
added = true;
queue_hashes.insert (vote->signature); // Keep track of vote signatures to avoid duplicates
}
}
}
@ -73,13 +129,26 @@ bool nano::vote_rebroadcaster::put (std::shared_ptr<nano::vote> const & vote)
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::queued);
condition.notify_one ();
}
else
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::overfill);
}
return added;
}
std::pair<std::shared_ptr<nano::vote>, nano::rep_tier> nano::vote_rebroadcaster::next ()
{
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());
queue.periodic_update ();
auto [vote, origin] = queue.next ();
release_assert (vote != nullptr);
release_assert (origin.source != nano::rep_tier::none);
auto erased = queue_hashes.erase (vote->signature);
debug_assert (erased == 1);
return { vote, origin.source };
}
void nano::vote_rebroadcaster::run ()
{
std::unique_lock lock{ mutex };
@ -96,37 +165,285 @@ void nano::vote_rebroadcaster::run ()
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::loop);
if (refresh_interval.elapse (15s))
// Update local reps cache
if (refresh_interval.elapse (nano::is_dev_run () ? 1s : 15s))
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::refresh);
reps = wallets.reps ();
enable = !reps.have_half_rep (); // Disable vote rebroadcasting if the node has a principal representative (or close to)
non_principal = !reps.have_half_rep (); // Disable vote rebroadcasting if the node has a principal representative (or close to)
}
// Cleanup expired representatives from rebroadcasts
if (cleanup_interval.elapse (nano::is_dev_run () ? 1s : 60s))
{
lock.unlock ();
cleanup ();
lock.lock ();
}
float constexpr network_fanout_scale = 1.0f;
// Wait for spare if our network traffic is too high
if (!network.check_capacity (nano::transport::traffic_type::vote_rebroadcast, network_fanout_scale))
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::cooldown);
lock.unlock ();
std::this_thread::sleep_for (100ms);
lock.lock ();
continue; // Wait for more capacity
}
if (!queue.empty ())
{
auto vote = queue.front ();
queue.pop_front ();
auto [vote, tier] = next ();
lock.unlock ();
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast);
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast_hashes, vote->hashes.size ());
bool should_rebroadcast = process (vote);
if (should_rebroadcast)
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast);
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::rebroadcast_hashes, vote->hashes.size ());
stats.inc (nano::stat::type::vote_rebroadcaster_tier, to_stat_detail (tier));
auto sent = network.flood_vote_rebroadcasted (vote, 0.5f);
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::sent, sent);
auto sent = network.flood_vote_rebroadcasted (vote, network_fanout_scale);
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::sent, sent);
}
lock.lock ();
}
}
}
bool nano::vote_rebroadcaster::process (std::shared_ptr<nano::vote> const & vote)
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::process);
auto rebroadcasts_l = rebroadcasts.lock ();
auto result = rebroadcasts_l->check_and_record (vote, ledger.weight (vote->account), std::chrono::steady_clock::now ());
if (result == nano::vote_rebroadcaster_index::result::ok)
{
return true; // Vote qualifies for rebroadcast
}
else
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::enum_util::cast<nano::stat::detail> (result));
return false; // Vote does not qualify for rebroadcast
}
}
void nano::vote_rebroadcaster::cleanup ()
{
stats.inc (nano::stat::type::vote_rebroadcaster, nano::stat::detail::cleanup);
auto rebroadcasts_l = rebroadcasts.lock ();
auto erased_reps = rebroadcasts_l->cleanup ([this] (auto const & rep) {
auto tier = rep_tiers.tier (rep);
auto weight = ledger.weight (rep);
return std::make_pair (tier != nano::rep_tier::none /* keep entry only if principal rep */, weight);
});
stats.add (nano::stat::type::vote_rebroadcaster, nano::stat::detail::representatives_erase_stale, erased_reps);
}
nano::container_info nano::vote_rebroadcaster::container_info () const
{
std::lock_guard guard{ mutex };
auto rebroadcasts_l = rebroadcasts.lock ();
nano::container_info info;
info.put ("queue", queue.size ());
info.add ("queue", queue.container_info ());
info.put ("queue_total", queue.size ());
info.put ("queue_hashes", queue_hashes.size ());
info.put ("representatives", rebroadcasts_l->representatives_count ());
info.put ("history", rebroadcasts_l->total_history ());
info.put ("hashes", rebroadcasts_l->total_hashes ());
return info;
}
/*
* vote_rebroadcaster_index
*/
nano::vote_rebroadcaster_index::vote_rebroadcaster_index (nano::vote_rebroadcaster_config const & config_a) :
config{ config_a }
{
}
nano::vote_rebroadcaster_index::result nano::vote_rebroadcaster_index::check_and_record (std::shared_ptr<nano::vote> const & vote, nano::uint128_t rep_weight, std::chrono::steady_clock::time_point now)
{
auto const vote_timestamp = vote->timestamp ();
auto const vote_hash = vote->full_hash ();
auto it = index.get<tag_account> ().find (vote->account);
// If we don't have a record for this rep, add it
if (it == index.get<tag_account> ().end ())
{
auto should_add = [&, this] () {
// Under normal conditions the number of principal representatives should be below this limit
if (index.size () < config.max_representatives)
{
return true;
}
// However, if we're at capacity, we can still add the rep if it has a higher weight than the lowest weight in the container
if (auto lowest = index.get<tag_weight> ().begin (); lowest != index.get<tag_weight> ().end ())
{
return rep_weight > lowest->weight;
}
return false;
};
if (should_add ())
{
it = index.get<tag_account> ().emplace (representative_entry{ vote->account, rep_weight }).first;
}
else
{
return result::representatives_full;
}
}
release_assert (it != index.get<tag_account> ().end ());
auto & history = it->history;
auto & hashes = it->hashes;
// Check if we already rebroadcasted this exact vote (fast lookup by hash)
if (hashes.get<tag_vote_hash> ().contains (vote_hash))
{
return result::already_rebroadcasted;
}
// Check if any of the hashes contained in the vote qualifies for rebroadcasting
auto check_hash = [&] (auto const & hash) {
if (auto existing = history.get<tag_block_hash> ().find (hash); existing != history.get<tag_block_hash> ().end ())
{
// Always rebroadcast vote if rep switched to a final vote
if (nano::vote::is_final_timestamp (vote_timestamp) && vote_timestamp > existing->vote_timestamp)
{
return true;
}
// Otherwise only rebroadcast if sufficient time has passed since the last rebroadcast
if (existing->timestamp + config.rebroadcast_threshold > now)
{
return false; // Not enough (as seen by local clock) time has passed
}
if (add_sat (existing->vote_timestamp, static_cast<nano::vote_timestamp> (config.rebroadcast_threshold.count ())) > vote_timestamp)
{
return false; // Not enough (as seen by vote timestamp) time has passed
}
return true; // Enough time has passed, block hash qualifies for rebroadcast
}
else
{
return true; // Block hash not seen before, rebroadcast
}
};
bool should_rebroadcast = std::any_of (vote->hashes.begin (), vote->hashes.end (), check_hash);
if (!should_rebroadcast)
{
return result::rebroadcast_unnecessary;
}
// Update the history with the new vote info
for (auto const & hash : vote->hashes)
{
if (auto existing = history.get<tag_block_hash> ().find (hash); existing != history.get<tag_block_hash> ().end ())
{
history.get<tag_block_hash> ().modify (existing, [&] (auto & entry) {
entry.vote_timestamp = vote_timestamp;
entry.timestamp = now;
});
}
else
{
history.get<tag_block_hash> ().emplace (rebroadcast_entry{ hash, vote_timestamp, now });
}
}
// Also keep track of the vote hash to quickly filter out duplicates
hashes.push_back (vote_hash);
// Keep history and hashes sizes within limits, erase oldest entries
while (history.size () > config.max_history)
{
history.pop_front (); // Remove the oldest entry
}
while (hashes.size () > config.max_history)
{
hashes.pop_front (); // Remove the oldest entry
}
// Keep representatives index within limits, erase lowest weight entries
while (!index.empty () && index.size () > config.max_representatives)
{
index.get<tag_weight> ().erase (index.get<tag_weight> ().begin ());
}
return result::ok; // Rebroadcast the vote
}
size_t nano::vote_rebroadcaster_index::cleanup (rep_query query)
{
// Remove entries for accounts that are no longer principal representatives
auto erased_reps = erase_if (index, [&] (auto const & entry) {
auto [should_keep, weight] = query (entry.representative);
return !should_keep;
});
// Update representative weights
for (auto it = index.begin (), end = index.end (); it != end; ++it)
{
index.modify (it, [&] (auto & entry) {
auto [tier, weight] = query (entry.representative);
entry.weight = weight;
});
}
return erased_reps;
}
bool nano::vote_rebroadcaster_index::contains_vote (nano::block_hash const & vote_hash) const
{
return std::any_of (index.begin (), index.end (), [&] (auto const & entry) {
return entry.hashes.template get<tag_vote_hash> ().contains (vote_hash);
});
}
bool nano::vote_rebroadcaster_index::contains_representative (nano::account const & representative) const
{
return index.get<tag_account> ().contains (representative);
}
bool nano::vote_rebroadcaster_index::contains_block (nano::account const & representative, nano::block_hash const & block_hash) const
{
if (auto it = index.get<tag_account> ().find (representative); it != index.get<tag_account> ().end ())
{
return it->history.get<tag_block_hash> ().find (block_hash) != it->history.get<tag_block_hash> ().end ();
}
return false;
}
size_t nano::vote_rebroadcaster_index::representatives_count () const
{
return index.size ();
}
size_t nano::vote_rebroadcaster_index::total_history () const
{
return std::accumulate (index.begin (), index.end (), size_t{ 0 }, [] (auto total, auto const & entry) {
return total + entry.history.size ();
});
}
size_t nano::vote_rebroadcaster_index::total_hashes () const
{
return std::accumulate (index.begin (), index.end (), size_t{ 0 }, [] (auto total, auto const & entry) {
return total + entry.hashes.size ();
});
}

View file

@ -1,45 +1,169 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/node/fwd.hpp>
#include <nano/node/wallet.hpp>
#include <nano/secure/vote.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index/sequenced_index.hpp>
#include <boost/multi_index_container.hpp>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <thread>
#include <unordered_map>
namespace mi = boost::multi_index;
namespace nano
{
class vote_rebroadcaster_config final
{
public:
// TODO: Serde
public:
bool enable{ true };
size_t max_queue{ 1024 * 4 }; // Maximum number of votes to keep in queue for processing
size_t max_history{ 1024 * 32 }; // Maximum number of recently broadcast hashes to keep per representative
size_t max_representatives{ 100 }; // Maximum number of representatives to track rebroadcasts for
std::chrono::milliseconds rebroadcast_threshold{ 1000 * 90 }; // Minimum amount of time between rebroadcasts for the same hash from the same representative (milliseconds)
size_t priority_coefficient{ 2 }; // Priority coefficient for prioritizing votes from representative tiers
};
class vote_rebroadcaster_index
{
public:
explicit vote_rebroadcaster_index (vote_rebroadcaster_config const &);
enum class result
{
ok,
already_rebroadcasted,
representatives_full,
rebroadcast_unnecessary,
};
result check_and_record (std::shared_ptr<nano::vote> const & vote, nano::uint128_t rep_weight, std::chrono::steady_clock::time_point now);
using rep_query = std::function<std::pair<bool, nano::uint128_t> (nano::account const &)>; // Returns <should keep, rep weight>
size_t cleanup (rep_query);
bool contains_vote (nano::block_hash const & vote_hash) const;
bool contains_representative (nano::account const & representative) const;
bool contains_block (nano::account const & representative, nano::block_hash const & block_hash) const;
size_t representatives_count () const;
size_t total_history () const;
size_t total_hashes () const;
private:
vote_rebroadcaster_config const & config;
struct rebroadcast_entry
{
nano::block_hash block_hash;
nano::vote_timestamp vote_timestamp;
std::chrono::steady_clock::time_point timestamp;
};
// clang-format off
class tag_sequenced {};
class tag_vote_hash {};
class tag_block_hash {};
// Tracks rebroadcast history for individual block hashes
using ordered_rebroadcasts = boost::multi_index_container<rebroadcast_entry,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::hashed_unique<mi::tag<tag_block_hash>,
mi::member<rebroadcast_entry, nano::block_hash, &rebroadcast_entry::block_hash>>
>>;
// Tracks rebroadcast history for full votes
using ordered_hashes = boost::multi_index_container<nano::block_hash,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::hashed_unique<mi::tag<tag_vote_hash>,
mi::identity<nano::block_hash>>
>>;
// clang-format on
struct representative_entry
{
nano::account representative;
nano::uint128_t weight;
mutable ordered_rebroadcasts history;
mutable ordered_hashes hashes;
};
// clang-format off
class tag_account {};
class tag_weight {};
using ordered_representatives = boost::multi_index_container<representative_entry,
mi::indexed_by<
mi::sequenced<mi::tag<tag_sequenced>>,
mi::hashed_unique<mi::tag<tag_account>,
mi::member<representative_entry, nano::account, &representative_entry::representative>>,
mi::ordered_non_unique<mi::tag<tag_weight>,
mi::member<representative_entry, nano::uint128_t, &representative_entry::weight>>
>>;
// clang-format on
ordered_representatives index;
};
class vote_rebroadcaster final
{
public:
static size_t constexpr max_queue = 1024 * 16;
public:
vote_rebroadcaster (nano::vote_router &, nano::network &, nano::wallets &, nano::stats &, nano::logger &);
vote_rebroadcaster (vote_rebroadcaster_config const &, nano::ledger &, nano::vote_router &, nano::network &, nano::wallets &, nano::rep_tiers &, nano::stats &, nano::logger &);
~vote_rebroadcaster ();
void start ();
void stop ();
bool put (std::shared_ptr<nano::vote> const &);
bool push (std::shared_ptr<nano::vote> const &, nano::rep_tier);
nano::container_info container_info () const;
public: // Dependencies
vote_rebroadcaster_config const & config;
nano::ledger & ledger;
nano::vote_router & vote_router;
nano::network & network;
nano::wallets & wallets;
nano::rep_tiers & rep_tiers;
nano::stats & stats;
nano::logger & logger;
private:
void run ();
void cleanup ();
bool process (std::shared_ptr<nano::vote> const &);
std::pair<std::shared_ptr<nano::vote>, nano::rep_tier> next ();
std::atomic<bool> enable{ true }; // Enable vote rebroadcasting only if the node does not host a representative
std::deque<std::shared_ptr<nano::vote>> queue;
private:
// Queue of recently processed votes to potentially rebroadcast
nano::fair_queue<std::shared_ptr<nano::vote>, nano::rep_tier> queue;
std::unordered_set<nano::signature> queue_hashes; // Avoids queuing the same vote multiple times
nano::locked<vote_rebroadcaster_index> rebroadcasts;
private:
std::atomic<bool> non_principal{ true };
nano::wallet_representatives reps;
nano::interval refresh_interval;
nano::interval cleanup_interval;
bool stopped{ false };
std::condition_variable condition;

View file

@ -63,7 +63,7 @@ public: // Payload
nano::signature signature{ 0 };
private: // Payload
// Vote timestamp
// Vote timestamp (milliseconds since epoch)
uint64_t timestamp_m{ 0 };
private: