Filter duplicate votes

This commit is contained in:
Piotr Wójcik 2024-09-23 17:08:25 +02:00
commit 10c06f8875
6 changed files with 48 additions and 19 deletions

View file

@ -201,6 +201,9 @@ enum class detail
asc_pull_req, asc_pull_req,
asc_pull_ack, asc_pull_ack,
// dropped messages
confirm_ack_zero_account,
// bootstrap, callback // bootstrap, callback
initiate, initiate,
initiate_legacy_age, initiate_legacy_age,
@ -354,6 +357,7 @@ enum class detail
// duplicate // duplicate
duplicate_publish_message, duplicate_publish_message,
duplicate_confirm_ack_message,
// telemetry // telemetry
invalid_signature, invalid_signature,

View file

@ -210,9 +210,18 @@ public:
void confirm_ack (nano::confirm_ack const & message) override void confirm_ack (nano::confirm_ack const & message) override
{ {
if (!message.vote->account.is_zero ()) // Ignore zero account votes
if (message.vote->account.is_zero ())
{ {
node.vote_processor.vote (message.vote, channel, message.is_rebroadcasted () ? nano::vote_source::rebroadcast : nano::vote_source::live); node.stats.inc (nano::stat::type::drop, nano::stat::detail::confirm_ack_zero_account, nano::stat::dir::in);
return;
}
bool added = node.vote_processor.vote (message.vote, channel, message.is_rebroadcasted () ? nano::vote_source::rebroadcast : nano::vote_source::live);
if (!added)
{
node.network.publish_filter.clear (message.digest);
node.stats.inc (nano::stat::type::drop, nano::stat::detail::confirm_ack, nano::stat::dir::in);
} }
} }

View file

@ -613,9 +613,10 @@ void nano::confirm_req::operator() (nano::object_stream & obs) const
* confirm_ack * confirm_ack
*/ */
nano::confirm_ack::confirm_ack (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::vote_uniquer * uniquer_a) : nano::confirm_ack::confirm_ack (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::network_filter::digest_t const & digest_a, nano::vote_uniquer * uniquer_a) :
message (header_a), message (header_a),
vote (nano::make_shared<nano::vote> (error_a, stream_a)) vote{ nano::make_shared<nano::vote> (error_a, stream_a) },
digest{ digest_a }
{ {
if (!error_a && uniquer_a) if (!error_a && uniquer_a)
{ {
@ -625,7 +626,7 @@ nano::confirm_ack::confirm_ack (bool & error_a, nano::stream & stream_a, nano::m
nano::confirm_ack::confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const & vote_a, bool rebroadcasted_a) : nano::confirm_ack::confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const & vote_a, bool rebroadcasted_a) :
message (constants, nano::message_type::confirm_ack), message (constants, nano::message_type::confirm_ack),
vote (vote_a) vote{ vote_a }
{ {
debug_assert (vote->hashes.size () < 256); debug_assert (vote->hashes.size () < 256);

View file

@ -204,7 +204,7 @@ public: // Payload
std::shared_ptr<nano::block> block; std::shared_ptr<nano::block> block;
// Messages deserialized from network should have their digest set // Messages deserialized from network should have their digest set
nano::network_filter::digest_t digest; nano::network_filter::digest_t digest{ 0 };
public: // Logging public: // Logging
void operator() (nano::object_stream &) const override; void operator() (nano::object_stream &) const override;
@ -267,7 +267,7 @@ public: // Logging
class confirm_ack final : public message class confirm_ack final : public message
{ {
public: public:
confirm_ack (bool & error, nano::stream &, nano::message_header const &, nano::vote_uniquer * = nullptr); confirm_ack (bool & error, nano::stream &, nano::message_header const &, nano::network_filter::digest_t const & digest = 0, nano::vote_uniquer * = nullptr);
confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const &, bool rebroadcasted = false); confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const &, bool rebroadcasted = false);
void serialize (nano::stream &) const override; void serialize (nano::stream &) const override;
@ -285,6 +285,9 @@ private:
public: // Payload public: // Payload
std::shared_ptr<nano::vote> vote; std::shared_ptr<nano::vote> vote;
// Messages deserialized from network should have their digest set
nano::network_filter::digest_t digest{ 0 };
public: // Logging public: // Logging
void operator() (nano::object_stream &) const override; void operator() (nano::object_stream &) const override;
}; };

View file

@ -2,11 +2,11 @@
#include <nano/node/node.hpp> #include <nano/node/node.hpp>
#include <nano/node/transport/message_deserializer.hpp> #include <nano/node/transport/message_deserializer.hpp>
nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a, nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & network_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a,
read_query read_op) : read_query read_op) :
read_buffer{ std::make_shared<std::vector<uint8_t>> () }, read_buffer{ std::make_shared<std::vector<uint8_t>> () },
network_constants_m{ network_constants_a }, network_constants_m{ network_constants_a },
publish_filter_m{ publish_filter_a }, network_filter_m{ network_filter_a },
block_uniquer_m{ block_uniquer_a }, block_uniquer_m{ block_uniquer_a },
vote_uniquer_m{ vote_uniquer_a }, vote_uniquer_m{ vote_uniquer_a },
read_op{ std::move (read_op) } read_op{ std::move (read_op) }
@ -128,9 +128,9 @@ std::unique_ptr<nano::message> nano::transport::message_deserializer::deserializ
} }
case nano::message_type::publish: case nano::message_type::publish:
{ {
// Early filtering to not waste time deserializing duplicate blocks // Early filtering to not waste time deserializing duplicates
nano::uint128_t digest; nano::uint128_t digest;
if (!publish_filter_m.apply (read_buffer->data (), payload_size, &digest)) if (!network_filter_m.apply (read_buffer->data (), payload_size, &digest))
{ {
return deserialize_publish (stream, header, digest); return deserialize_publish (stream, header, digest);
} }
@ -146,7 +146,17 @@ std::unique_ptr<nano::message> nano::transport::message_deserializer::deserializ
} }
case nano::message_type::confirm_ack: case nano::message_type::confirm_ack:
{ {
return deserialize_confirm_ack (stream, header); // Early filtering to not waste time deserializing duplicates
nano::uint128_t digest;
if (!network_filter_m.apply (read_buffer->data (), payload_size, &digest))
{
return deserialize_confirm_ack (stream, header, digest);
}
else
{
status = parse_status::duplicate_confirm_ack_message;
}
break;
} }
case nano::message_type::node_id_handshake: case nano::message_type::node_id_handshake:
{ {
@ -208,7 +218,7 @@ std::unique_ptr<nano::keepalive> nano::transport::message_deserializer::deserial
return {}; return {};
} }
std::unique_ptr<nano::publish> nano::transport::message_deserializer::deserialize_publish (nano::stream & stream, nano::message_header const & header, nano::uint128_t const & digest_a) std::unique_ptr<nano::publish> nano::transport::message_deserializer::deserialize_publish (nano::stream & stream, nano::message_header const & header, nano::network_filter::digest_t const & digest_a)
{ {
auto error = false; auto error = false;
auto incoming = std::make_unique<nano::publish> (error, stream, header, digest_a, &block_uniquer_m); auto incoming = std::make_unique<nano::publish> (error, stream, header, digest_a, &block_uniquer_m);
@ -246,10 +256,10 @@ std::unique_ptr<nano::confirm_req> nano::transport::message_deserializer::deseri
return {}; return {};
} }
std::unique_ptr<nano::confirm_ack> nano::transport::message_deserializer::deserialize_confirm_ack (nano::stream & stream, nano::message_header const & header) std::unique_ptr<nano::confirm_ack> nano::transport::message_deserializer::deserialize_confirm_ack (nano::stream & stream, nano::message_header const & header, nano::network_filter::digest_t const & digest_a)
{ {
auto error = false; auto error = false;
auto incoming = std::make_unique<nano::confirm_ack> (error, stream, header, &vote_uniquer_m); auto incoming = std::make_unique<nano::confirm_ack> (error, stream, header, digest_a, &vote_uniquer_m);
if (!error && nano::at_end (stream)) if (!error && nano::at_end (stream))
{ {
return incoming; return incoming;

View file

@ -33,6 +33,7 @@ namespace transport
invalid_network, invalid_network,
outdated_version, outdated_version,
duplicate_publish_message, duplicate_publish_message,
duplicate_confirm_ack_message,
message_size_too_big, message_size_too_big,
}; };
@ -41,7 +42,7 @@ namespace transport
public: public:
using callback_type = std::function<void (boost::system::error_code, std::unique_ptr<nano::message>)>; using callback_type = std::function<void (boost::system::error_code, std::unique_ptr<nano::message>)>;
parse_status status; parse_status status{ parse_status::none };
using read_query = std::function<void (std::shared_ptr<std::vector<uint8_t>> const &, size_t, std::function<void (boost::system::error_code const &, std::size_t)>)>; using read_query = std::function<void (std::shared_ptr<std::vector<uint8_t>> const &, size_t, std::function<void (boost::system::error_code const &, std::size_t)>)>;
@ -66,9 +67,9 @@ namespace transport
*/ */
std::unique_ptr<nano::message> deserialize (nano::message_header header, std::size_t payload_size); std::unique_ptr<nano::message> deserialize (nano::message_header header, std::size_t payload_size);
std::unique_ptr<nano::keepalive> deserialize_keepalive (nano::stream &, nano::message_header const &); std::unique_ptr<nano::keepalive> deserialize_keepalive (nano::stream &, nano::message_header const &);
std::unique_ptr<nano::publish> deserialize_publish (nano::stream &, nano::message_header const &, nano::uint128_t const & = 0); std::unique_ptr<nano::publish> deserialize_publish (nano::stream &, nano::message_header const &, nano::network_filter::digest_t const & digest);
std::unique_ptr<nano::confirm_req> deserialize_confirm_req (nano::stream &, nano::message_header const &); std::unique_ptr<nano::confirm_req> deserialize_confirm_req (nano::stream &, nano::message_header const &);
std::unique_ptr<nano::confirm_ack> deserialize_confirm_ack (nano::stream &, nano::message_header const &); std::unique_ptr<nano::confirm_ack> deserialize_confirm_ack (nano::stream &, nano::message_header const &, nano::network_filter::digest_t const & digest);
std::unique_ptr<nano::node_id_handshake> deserialize_node_id_handshake (nano::stream &, nano::message_header const &); std::unique_ptr<nano::node_id_handshake> deserialize_node_id_handshake (nano::stream &, nano::message_header const &);
std::unique_ptr<nano::telemetry_req> deserialize_telemetry_req (nano::stream &, nano::message_header const &); std::unique_ptr<nano::telemetry_req> deserialize_telemetry_req (nano::stream &, nano::message_header const &);
std::unique_ptr<nano::telemetry_ack> deserialize_telemetry_ack (nano::stream &, nano::message_header const &); std::unique_ptr<nano::telemetry_ack> deserialize_telemetry_ack (nano::stream &, nano::message_header const &);
@ -79,6 +80,7 @@ namespace transport
std::unique_ptr<nano::asc_pull_req> deserialize_asc_pull_req (nano::stream &, nano::message_header const &); std::unique_ptr<nano::asc_pull_req> deserialize_asc_pull_req (nano::stream &, nano::message_header const &);
std::unique_ptr<nano::asc_pull_ack> deserialize_asc_pull_ack (nano::stream &, nano::message_header const &); std::unique_ptr<nano::asc_pull_ack> deserialize_asc_pull_ack (nano::stream &, nano::message_header const &);
private:
std::shared_ptr<std::vector<uint8_t>> read_buffer; std::shared_ptr<std::vector<uint8_t>> read_buffer;
private: // Constants private: // Constants
@ -87,7 +89,7 @@ namespace transport
private: // Dependencies private: // Dependencies
nano::network_constants const & network_constants_m; nano::network_constants const & network_constants_m;
nano::network_filter & publish_filter_m; nano::network_filter & network_filter_m;
nano::block_uniquer & block_uniquer_m; nano::block_uniquer & block_uniquer_m;
nano::vote_uniquer & vote_uniquer_m; nano::vote_uniquer & vote_uniquer_m;
read_query read_op; read_query read_op;