From f6e47aa4e2da811787dcfbdc150358547853b1bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:49:23 +0200 Subject: [PATCH 01/15] Move `network_filter` to lib folder --- nano/core_test/network_filter.cpp | 2 +- nano/lib/CMakeLists.txt | 2 ++ nano/{secure => lib}/network_filter.cpp | 2 +- nano/{secure => lib}/network_filter.hpp | 0 nano/node/common.hpp | 2 +- nano/node/network.hpp | 2 +- nano/node/transport/message_deserializer.hpp | 4 +++- nano/secure/CMakeLists.txt | 2 -- 8 files changed, 9 insertions(+), 7 deletions(-) rename nano/{secure => lib}/network_filter.cpp (98%) rename nano/{secure => lib}/network_filter.hpp (100%) diff --git a/nano/core_test/network_filter.cpp b/nano/core_test/network_filter.cpp index a886ae956..b23345374 100644 --- a/nano/core_test/network_filter.cpp +++ b/nano/core_test/network_filter.cpp @@ -1,8 +1,8 @@ #include +#include #include #include #include -#include #include #include diff --git a/nano/lib/CMakeLists.txt b/nano/lib/CMakeLists.txt index 144e1adfd..25f6d0add 100644 --- a/nano/lib/CMakeLists.txt +++ b/nano/lib/CMakeLists.txt @@ -63,6 +63,8 @@ add_library( logging_enums.cpp memory.hpp memory.cpp + network_filter.hpp + network_filter.cpp numbers.hpp numbers.cpp object_stream.hpp diff --git a/nano/secure/network_filter.cpp b/nano/lib/network_filter.cpp similarity index 98% rename from nano/secure/network_filter.cpp rename to nano/lib/network_filter.cpp index c60a11174..f89bff5d3 100644 --- a/nano/secure/network_filter.cpp +++ b/nano/lib/network_filter.cpp @@ -1,9 +1,9 @@ #include #include #include +#include #include #include -#include nano::network_filter::network_filter (size_t size_a) : items (size_a, nano::uint128_t{ 0 }) diff --git a/nano/secure/network_filter.hpp b/nano/lib/network_filter.hpp similarity index 100% rename from nano/secure/network_filter.hpp rename to nano/lib/network_filter.hpp diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 54c337ba1..5dd98516d 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -3,8 +3,8 @@ #include #include #include +#include #include -#include #include #include diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 31ebeee2b..54f24e7c7 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -1,13 +1,13 @@ #pragma once #include +#include #include #include #include #include #include #include -#include #include #include diff --git a/nano/node/transport/message_deserializer.hpp b/nano/node/transport/message_deserializer.hpp index c5ac4346b..c6eabf705 100644 --- a/nano/node/transport/message_deserializer.hpp +++ b/nano/node/transport/message_deserializer.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -43,7 +44,8 @@ namespace transport parse_status status; using read_query = std::function> const &, size_t, std::function)>; - message_deserializer (network_constants const &, network_filter &, block_uniquer &, vote_uniquer &, read_query read_op); + + message_deserializer (nano::network_constants const &, nano::network_filter &, nano::block_uniquer &, nano::vote_uniquer &, read_query read_op); /* * Asynchronously read next message from the channel_read_fn. diff --git a/nano/secure/CMakeLists.txt b/nano/secure/CMakeLists.txt index 7b97291e9..f11d6e85d 100644 --- a/nano/secure/CMakeLists.txt +++ b/nano/secure/CMakeLists.txt @@ -56,8 +56,6 @@ add_library( ledger_set_any.cpp ledger_set_confirmed.hpp ledger_set_confirmed.cpp - network_filter.hpp - network_filter.cpp pending_info.hpp pending_info.cpp receivable_iterator.cpp From 16f60f86afa84116bae775ec2ca6cf633f055134 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:37:40 +0200 Subject: [PATCH 02/15] Cleanup --- nano/node/messages.cpp | 4 ++-- nano/node/messages.hpp | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/nano/node/messages.cpp b/nano/node/messages.cpp index 0143a62b2..b0ea66657 100644 --- a/nano/node/messages.cpp +++ b/nano/node/messages.cpp @@ -425,7 +425,7 @@ void nano::keepalive::operator() (nano::object_stream & obs) const nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::uint128_t const & digest_a, nano::block_uniquer * uniquer_a) : message (header_a), - digest (digest_a) + digest{ digest_a } { if (!error_a) { @@ -435,7 +435,7 @@ nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_h nano::publish::publish (nano::network_constants const & constants, std::shared_ptr const & block_a, bool is_originator_a) : message (constants, nano::message_type::publish), - block (block_a) + block{ block_a } { header.block_type_set (block->type ()); header.flag_set (originator_flag, is_originator_a); diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index 33431821f..91d597aea 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -188,7 +188,7 @@ public: // Logging class publish final : public message { public: - publish (bool &, nano::stream &, nano::message_header const &, nano::uint128_t const & = 0, nano::block_uniquer * = nullptr); + publish (bool &, nano::stream &, nano::message_header const &, nano::uint128_t const & digest = 0, nano::block_uniquer * = nullptr); publish (nano::network_constants const & constants, std::shared_ptr const &, bool is_originator = false); void serialize (nano::stream &) const override; @@ -201,6 +201,8 @@ public: public: // Payload std::shared_ptr block; + + // Messages deserialized from network should have their digest set nano::uint128_t digest{ 0 }; public: // Logging From 4f99a456a7c03f9cd18f1977dd22e86a8b583bb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:53:34 +0200 Subject: [PATCH 03/15] Cleanup #2 --- nano/lib/network_filter.hpp | 3 +++ nano/node/messages.cpp | 2 +- nano/node/messages.hpp | 5 +++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/nano/lib/network_filter.hpp b/nano/lib/network_filter.hpp index 7604acfc4..c9cb20268 100644 --- a/nano/lib/network_filter.hpp +++ b/nano/lib/network_filter.hpp @@ -16,6 +16,9 @@ namespace nano */ class network_filter final { +public: + using digest_t = nano::uint128_t; + public: network_filter () = delete; network_filter (size_t size_a); diff --git a/nano/node/messages.cpp b/nano/node/messages.cpp index b0ea66657..3a3b4f326 100644 --- a/nano/node/messages.cpp +++ b/nano/node/messages.cpp @@ -423,7 +423,7 @@ void nano::keepalive::operator() (nano::object_stream & obs) const * publish */ -nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::uint128_t const & digest_a, nano::block_uniquer * uniquer_a) : +nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::network_filter::digest_t const & digest_a, nano::block_uniquer * uniquer_a) : message (header_a), digest{ digest_a } { diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index 91d597aea..1c6833804 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -188,7 +189,7 @@ public: // Logging class publish final : public message { public: - publish (bool &, nano::stream &, nano::message_header const &, nano::uint128_t const & digest = 0, nano::block_uniquer * = nullptr); + publish (bool &, nano::stream &, nano::message_header const &, nano::network_filter::digest_t const & digest = 0, nano::block_uniquer * = nullptr); publish (nano::network_constants const & constants, std::shared_ptr const &, bool is_originator = false); void serialize (nano::stream &) const override; @@ -203,7 +204,7 @@ public: // Payload std::shared_ptr block; // Messages deserialized from network should have their digest set - nano::uint128_t digest{ 0 }; + nano::network_filter::digest_t digest; public: // Logging void operator() (nano::object_stream &) const override; From 10c06f8875c283d8d408d92ea7cbe264a5f1da96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 23 Sep 2024 17:08:25 +0200 Subject: [PATCH 04/15] Filter duplicate votes --- nano/lib/stats_enums.hpp | 4 +++ nano/node/message_processor.cpp | 13 ++++++++-- nano/node/messages.cpp | 7 +++--- nano/node/messages.hpp | 7 ++++-- nano/node/transport/message_deserializer.cpp | 26 ++++++++++++++------ nano/node/transport/message_deserializer.hpp | 10 +++++--- 6 files changed, 48 insertions(+), 19 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 64e7f9716..c8f3e5f81 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -201,6 +201,9 @@ enum class detail asc_pull_req, asc_pull_ack, + // dropped messages + confirm_ack_zero_account, + // bootstrap, callback initiate, initiate_legacy_age, @@ -354,6 +357,7 @@ enum class detail // duplicate duplicate_publish_message, + duplicate_confirm_ack_message, // telemetry invalid_signature, diff --git a/nano/node/message_processor.cpp b/nano/node/message_processor.cpp index 6ed8cd118..cd400daa1 100644 --- a/nano/node/message_processor.cpp +++ b/nano/node/message_processor.cpp @@ -210,9 +210,18 @@ public: void confirm_ack (nano::confirm_ack const & message) override { - if (!message.vote->account.is_zero ()) + // Ignore zero account votes + if (message.vote->account.is_zero ()) { - node.vote_processor.vote (message.vote, channel, message.is_rebroadcasted () ? nano::vote_source::rebroadcast : nano::vote_source::live); + node.stats.inc (nano::stat::type::drop, nano::stat::detail::confirm_ack_zero_account, nano::stat::dir::in); + return; + } + + bool added = node.vote_processor.vote (message.vote, channel, message.is_rebroadcasted () ? nano::vote_source::rebroadcast : nano::vote_source::live); + if (!added) + { + node.network.publish_filter.clear (message.digest); + node.stats.inc (nano::stat::type::drop, nano::stat::detail::confirm_ack, nano::stat::dir::in); } } diff --git a/nano/node/messages.cpp b/nano/node/messages.cpp index 3a3b4f326..a2e819cc3 100644 --- a/nano/node/messages.cpp +++ b/nano/node/messages.cpp @@ -613,9 +613,10 @@ void nano::confirm_req::operator() (nano::object_stream & obs) const * confirm_ack */ -nano::confirm_ack::confirm_ack (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::vote_uniquer * uniquer_a) : +nano::confirm_ack::confirm_ack (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::network_filter::digest_t const & digest_a, nano::vote_uniquer * uniquer_a) : message (header_a), - vote (nano::make_shared (error_a, stream_a)) + vote{ nano::make_shared (error_a, stream_a) }, + digest{ digest_a } { if (!error_a && uniquer_a) { @@ -625,7 +626,7 @@ nano::confirm_ack::confirm_ack (bool & error_a, nano::stream & stream_a, nano::m nano::confirm_ack::confirm_ack (nano::network_constants const & constants, std::shared_ptr const & vote_a, bool rebroadcasted_a) : message (constants, nano::message_type::confirm_ack), - vote (vote_a) + vote{ vote_a } { debug_assert (vote->hashes.size () < 256); diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index 1c6833804..00f1070b0 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -204,7 +204,7 @@ public: // Payload std::shared_ptr block; // Messages deserialized from network should have their digest set - nano::network_filter::digest_t digest; + nano::network_filter::digest_t digest{ 0 }; public: // Logging void operator() (nano::object_stream &) const override; @@ -267,7 +267,7 @@ public: // Logging class confirm_ack final : public message { public: - confirm_ack (bool & error, nano::stream &, nano::message_header const &, nano::vote_uniquer * = nullptr); + confirm_ack (bool & error, nano::stream &, nano::message_header const &, nano::network_filter::digest_t const & digest = 0, nano::vote_uniquer * = nullptr); confirm_ack (nano::network_constants const & constants, std::shared_ptr const &, bool rebroadcasted = false); void serialize (nano::stream &) const override; @@ -285,6 +285,9 @@ private: public: // Payload std::shared_ptr vote; + // Messages deserialized from network should have their digest set + nano::network_filter::digest_t digest{ 0 }; + public: // Logging void operator() (nano::object_stream &) const override; }; diff --git a/nano/node/transport/message_deserializer.cpp b/nano/node/transport/message_deserializer.cpp index 52d725b54..a7976279d 100644 --- a/nano/node/transport/message_deserializer.cpp +++ b/nano/node/transport/message_deserializer.cpp @@ -2,11 +2,11 @@ #include #include -nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a, +nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & network_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a, read_query read_op) : read_buffer{ std::make_shared> () }, network_constants_m{ network_constants_a }, - publish_filter_m{ publish_filter_a }, + network_filter_m{ network_filter_a }, block_uniquer_m{ block_uniquer_a }, vote_uniquer_m{ vote_uniquer_a }, read_op{ std::move (read_op) } @@ -128,9 +128,9 @@ std::unique_ptr nano::transport::message_deserializer::deserializ } case nano::message_type::publish: { - // Early filtering to not waste time deserializing duplicate blocks + // Early filtering to not waste time deserializing duplicates nano::uint128_t digest; - if (!publish_filter_m.apply (read_buffer->data (), payload_size, &digest)) + if (!network_filter_m.apply (read_buffer->data (), payload_size, &digest)) { return deserialize_publish (stream, header, digest); } @@ -146,7 +146,17 @@ std::unique_ptr nano::transport::message_deserializer::deserializ } case nano::message_type::confirm_ack: { - return deserialize_confirm_ack (stream, header); + // Early filtering to not waste time deserializing duplicates + nano::uint128_t digest; + if (!network_filter_m.apply (read_buffer->data (), payload_size, &digest)) + { + return deserialize_confirm_ack (stream, header, digest); + } + else + { + status = parse_status::duplicate_confirm_ack_message; + } + break; } case nano::message_type::node_id_handshake: { @@ -208,7 +218,7 @@ std::unique_ptr nano::transport::message_deserializer::deserial return {}; } -std::unique_ptr nano::transport::message_deserializer::deserialize_publish (nano::stream & stream, nano::message_header const & header, nano::uint128_t const & digest_a) +std::unique_ptr nano::transport::message_deserializer::deserialize_publish (nano::stream & stream, nano::message_header const & header, nano::network_filter::digest_t const & digest_a) { auto error = false; auto incoming = std::make_unique (error, stream, header, digest_a, &block_uniquer_m); @@ -246,10 +256,10 @@ std::unique_ptr nano::transport::message_deserializer::deseri return {}; } -std::unique_ptr nano::transport::message_deserializer::deserialize_confirm_ack (nano::stream & stream, nano::message_header const & header) +std::unique_ptr nano::transport::message_deserializer::deserialize_confirm_ack (nano::stream & stream, nano::message_header const & header, nano::network_filter::digest_t const & digest_a) { auto error = false; - auto incoming = std::make_unique (error, stream, header, &vote_uniquer_m); + auto incoming = std::make_unique (error, stream, header, digest_a, &vote_uniquer_m); if (!error && nano::at_end (stream)) { return incoming; diff --git a/nano/node/transport/message_deserializer.hpp b/nano/node/transport/message_deserializer.hpp index c6eabf705..4c4d62c22 100644 --- a/nano/node/transport/message_deserializer.hpp +++ b/nano/node/transport/message_deserializer.hpp @@ -33,6 +33,7 @@ namespace transport invalid_network, outdated_version, duplicate_publish_message, + duplicate_confirm_ack_message, message_size_too_big, }; @@ -41,7 +42,7 @@ namespace transport public: using callback_type = std::function)>; - parse_status status; + parse_status status{ parse_status::none }; using read_query = std::function> const &, size_t, std::function)>; @@ -66,9 +67,9 @@ namespace transport */ std::unique_ptr deserialize (nano::message_header header, std::size_t payload_size); std::unique_ptr deserialize_keepalive (nano::stream &, nano::message_header const &); - std::unique_ptr deserialize_publish (nano::stream &, nano::message_header const &, nano::uint128_t const & = 0); + std::unique_ptr deserialize_publish (nano::stream &, nano::message_header const &, nano::network_filter::digest_t const & digest); std::unique_ptr deserialize_confirm_req (nano::stream &, nano::message_header const &); - std::unique_ptr deserialize_confirm_ack (nano::stream &, nano::message_header const &); + std::unique_ptr deserialize_confirm_ack (nano::stream &, nano::message_header const &, nano::network_filter::digest_t const & digest); std::unique_ptr deserialize_node_id_handshake (nano::stream &, nano::message_header const &); std::unique_ptr deserialize_telemetry_req (nano::stream &, nano::message_header const &); std::unique_ptr deserialize_telemetry_ack (nano::stream &, nano::message_header const &); @@ -79,6 +80,7 @@ namespace transport std::unique_ptr deserialize_asc_pull_req (nano::stream &, nano::message_header const &); std::unique_ptr deserialize_asc_pull_ack (nano::stream &, nano::message_header const &); + private: std::shared_ptr> read_buffer; private: // Constants @@ -87,7 +89,7 @@ namespace transport private: // Dependencies nano::network_constants const & network_constants_m; - nano::network_filter & publish_filter_m; + nano::network_filter & network_filter_m; nano::block_uniquer & block_uniquer_m; nano::vote_uniquer & vote_uniquer_m; read_query read_op; From 02fa9bda463dd28b73b55d70a4d2bf6b37de951f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 23 Sep 2024 18:52:07 +0200 Subject: [PATCH 05/15] Configurable size --- nano/lib/network_filter.hpp | 4 ++-- nano/node/network.cpp | 2 +- nano/node/network.hpp | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/nano/lib/network_filter.hpp b/nano/lib/network_filter.hpp index c9cb20268..6634421c4 100644 --- a/nano/lib/network_filter.hpp +++ b/nano/lib/network_filter.hpp @@ -20,8 +20,8 @@ public: using digest_t = nano::uint128_t; public: - network_filter () = delete; - network_filter (size_t size_a); + explicit network_filter (size_t size); + /** * Reads \p count_a bytes starting from \p bytes_a and inserts the siphash digest in the filter. * @param \p digest_a if given, will be set to the resulting siphash digest diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 3d15149c3..0bb6d8218 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -25,7 +25,7 @@ nano::network::network (nano::node & node, uint16_t port) : id{ nano::network_constants::active_network }, syn_cookies{ node.config.network.max_peers_per_ip, node.logger }, resolver{ node.io_ctx }, - publish_filter{ 256 * 1024 }, + publish_filter{ node.config.network.duplicate_filter_size }, tcp_channels{ node }, port{ port } { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 54f24e7c7..a2b059eae 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -77,6 +77,8 @@ public: size_t max_peers_per_ip{ 4 }; /** Maximum number of peers per subnetwork */ size_t max_peers_per_subnetwork{ 16 }; + + size_t duplicate_filter_size{ 256 * 1024 }; }; class network final From 63db84a75ef5bbb4fc86b9318eac31078118a5cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 23 Sep 2024 19:28:59 +0200 Subject: [PATCH 06/15] Add `network_filter::check (...)` --- nano/core_test/network_filter.cpp | 13 ++++++++++ nano/lib/network_filter.cpp | 41 +++++++++++++++++++++++++------ nano/lib/network_filter.hpp | 32 +++++++++++++++--------- 3 files changed, 67 insertions(+), 19 deletions(-) diff --git a/nano/core_test/network_filter.cpp b/nano/core_test/network_filter.cpp index b23345374..d55b0dca2 100644 --- a/nano/core_test/network_filter.cpp +++ b/nano/core_test/network_filter.cpp @@ -7,6 +7,18 @@ #include +TEST (network_filter, apply) +{ + nano::network_filter filter (4); + ASSERT_FALSE (filter.check (34)); + ASSERT_FALSE (filter.apply (34)); + ASSERT_TRUE (filter.check (34)); + ASSERT_TRUE (filter.apply (34)); + filter.clear (nano::network_filter::digest_t{ 34 }); + ASSERT_FALSE (filter.check (34)); + ASSERT_FALSE (filter.apply (34)); +} + TEST (network_filter, unit) { nano::network_filter filter (1); @@ -92,6 +104,7 @@ TEST (network_filter, many) // Now filter the rest of the stream // All blocks should pass through ASSERT_FALSE (filter.apply (bytes->data (), block->size)); + ASSERT_TRUE (filter.check (bytes->data (), block->size)); ASSERT_FALSE (error); // Make sure the stream was rewinded correctly diff --git a/nano/lib/network_filter.cpp b/nano/lib/network_filter.cpp index f89bff5d3..dc9931890 100644 --- a/nano/lib/network_filter.cpp +++ b/nano/lib/network_filter.cpp @@ -11,26 +11,43 @@ nano::network_filter::network_filter (size_t size_a) : nano::random_pool::generate_block (key, key.size ()); } -bool nano::network_filter::apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_a) +bool nano::network_filter::apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_out) { // Get hash before locking - auto digest (hash (bytes_a, count_a)); + auto digest = hash (bytes_a, count_a); + if (digest_out) + { + *digest_out = digest; + } + return apply (digest); +} +bool nano::network_filter::apply (digest_t const & digest) +{ nano::lock_guard lock{ mutex }; - auto & element (get_element (digest)); - bool existed (element == digest); + + auto & element = get_element (digest); + bool existed = (element == digest); if (!existed) { // Replace likely old element with a new one element = digest; } - if (digest_a) - { - *digest_a = digest; - } return existed; } +bool nano::network_filter::check (uint8_t const * bytes, size_t count) const +{ + return check (hash (bytes, count)); +} + +bool nano::network_filter::check (digest_t const & digest) const +{ + nano::lock_guard lock{ mutex }; + auto & element = get_element (digest); + return element == digest; +} + void nano::network_filter::clear (nano::uint128_t const & digest_a) { nano::lock_guard lock{ mutex }; @@ -90,6 +107,14 @@ nano::uint128_t & nano::network_filter::get_element (nano::uint128_t const & has return items[index]; } +nano::uint128_t const & nano::network_filter::get_element (nano::uint128_t const & hash_a) const +{ + debug_assert (!mutex.try_lock ()); + debug_assert (items.size () > 0); + size_t index (hash_a % items.size ()); + return items[index]; +} + nano::uint128_t nano::network_filter::hash (uint8_t const * bytes_a, size_t count_a) const { nano::uint128_union digest{ 0 }; diff --git a/nano/lib/network_filter.hpp b/nano/lib/network_filter.hpp index 6634421c4..77a060f27 100644 --- a/nano/lib/network_filter.hpp +++ b/nano/lib/network_filter.hpp @@ -28,31 +28,38 @@ public: * @warning will read out of bounds if [ \p bytes_a, \p bytes_a + \p count_a ] is not a valid range * @return a boolean representing the previous existence of the hash in the filter. **/ - bool apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_a = nullptr); + bool apply (uint8_t const * bytes, size_t count, digest_t * digest_out = nullptr); + bool apply (digest_t const & digest); + + /** + * Checks if the digest is in the filter. + * @return a boolean representing the existence of the hash in the filter. + */ + bool check (uint8_t const * bytes, size_t count) const; + bool check (digest_t const & digest) const; /** * Sets the corresponding element in the filter to zero, if it matches \p digest_a exactly. **/ - void clear (nano::uint128_t const & digest_a); + void clear (digest_t const & digest); /** * Clear many digests from the filter **/ - void clear (std::vector const &); + void clear (std::vector const &); /** * Reads \p count_a bytes starting from \p bytes_a and digests the contents. * Then, sets the corresponding element in the filter to zero, if it matches the digest exactly. * @warning will read out of bounds if [ \p bytes_a, \p bytes_a + \p count_a ] is not a valid range **/ - void clear (uint8_t const * bytes_a, size_t count_a); + void clear (uint8_t const * bytes, size_t count); /** * Serializes \p object_a and clears the resulting siphash digest from the filter. - * @return a boolean representing the previous existence of the hash in the filter. **/ template - void clear (OBJECT const & object_a); + void clear (OBJECT const & object); /** Sets every element of the filter to zero, keeping its size and capacity. */ void clear (); @@ -61,7 +68,7 @@ public: * Serializes \p object_a and returns the resulting siphash digest */ template - nano::uint128_t hash (OBJECT const & object_a) const; + nano::uint128_t hash (OBJECT const & object) const; private: using siphash_t = CryptoPP::SipHash<2, 4, true>; @@ -71,16 +78,19 @@ private: * @note must have a lock on mutex * @return a reference to the element with key \p hash_a **/ - nano::uint128_t & get_element (nano::uint128_t const & hash_a); + nano::uint128_t & get_element (digest_t const & hash); + nano::uint128_t const & get_element (digest_t const & hash) const; /** * Hashes \p count_a bytes starting from \p bytes_a . * @return the siphash digest of the contents in \p bytes_a . **/ - nano::uint128_t hash (uint8_t const * bytes_a, size_t count_a) const; + nano::uint128_t hash (uint8_t const * bytes, size_t count) const; - std::vector items; +private: + std::vector items; CryptoPP::SecByteBlock key{ siphash_t::KEYLENGTH }; - nano::mutex mutex{ mutex_identifier (mutexes::network_filter) }; + + mutable nano::mutex mutex{ mutex_identifier (mutexes::network_filter) }; }; } From 1ec359a98dc8f2292db35776c53285ae84ef3706 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 23 Sep 2024 19:07:46 +0200 Subject: [PATCH 07/15] Tests --- nano/core_test/network.cpp | 41 +++++++++++++++++++++++++++++- nano/node/transport/tcp_server.cpp | 28 +++++++++++++------- 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 60efc32be..d1467472b 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -700,9 +700,10 @@ TEST (network, duplicate_detection) // Publish duplicate detection through TCP auto tcp_channel = node0.network.tcp_channels.find_node_id (node1.get_node_id ()); ASSERT_NE (nullptr, tcp_channel); + ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message)); tcp_channel->send (publish); - ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0); + ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 0); tcp_channel->send (publish); ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message), 1); } @@ -738,6 +739,44 @@ TEST (network, duplicate_revert_publish) ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); } +TEST (network, duplicate_vote_detection) +{ + nano::test::system system; + nano::node_flags node_flags; + auto & node0 = *system.add_node (node_flags); + auto & node1 = *system.add_node (node_flags); + + auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }); + nano::confirm_ack message{ nano::dev::network_params.network, vote }; + + ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message)); + + // Publish duplicate detection through TCP + auto tcp_channel = node0.network.tcp_channels.find_node_id (node1.get_node_id ()); + ASSERT_NE (nullptr, tcp_channel); + + ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message)); + tcp_channel->send (message); + ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); + tcp_channel->send (message); + ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1); +} + +// Ensures that the filter doesn't filter out votes that could not be queued for processing +TEST (network, duplicate_revert_vote) +{ + nano::test::system system; + nano::node_config node_config = system.default_config (); + node_config.vote_processor.max_non_pr_queue = 0; + auto & node0 = *system.add_node (node_config); + auto & node1 = *system.add_node (node_config); + + auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }); + nano::confirm_ack message{ nano::dev::network_params.network, vote }; + + +} + // The test must be completed in less than 1 second TEST (network, bandwidth_limiter_4_messages) { diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 9c03c7997..f7075faa3 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -119,16 +119,26 @@ void nano::transport::tcp_server::received_message (std::unique_ptrstats.inc (nano::stat::type::error, to_stat_detail (message_deserializer->status)); - // Avoid too much noise about `duplicate_publish_message` errors - if (message_deserializer->status == transport::parse_status::duplicate_publish_message) + switch (message_deserializer->status) { - node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message); - } - else - { - node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})", - to_string (message_deserializer->status), - fmt::streamed (remote_endpoint)); + // Avoid too much noise about `duplicate_publish_message` errors + case nano::transport::parse_status::duplicate_publish_message: + { + node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message); + } + break; + case nano::transport::parse_status::duplicate_confirm_ack_message: + { + node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message); + } + break; + default: + { + node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})", + to_string (message_deserializer->status), + fmt::streamed (remote_endpoint)); + } + break; } } From 6352f1aca7095fb3de9bcd82cab8d67acb926c41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 24 Sep 2024 14:32:27 +0200 Subject: [PATCH 08/15] Increase filter size --- nano/node/network.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/network.hpp b/nano/node/network.hpp index a2b059eae..ca9512e61 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -78,7 +78,7 @@ public: /** Maximum number of peers per subnetwork */ size_t max_peers_per_subnetwork{ 16 }; - size_t duplicate_filter_size{ 256 * 1024 }; + size_t duplicate_filter_size{ 1024 * 1024 }; }; class network final From 6ca1f1de3d5a1701308efe0f72bdb8a052ecabb5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 24 Sep 2024 17:02:14 +0200 Subject: [PATCH 09/15] Explicitly cache votes --- nano/node/node.cpp | 7 ------- nano/node/vote_router.cpp | 16 +++++++++++----- nano/node/vote_router.hpp | 8 ++++++-- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 182d1cd5e..46519b080 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -170,13 +170,6 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy return ledger.weight (rep); }; - vote_router.vote_processed.add ([this] (std::shared_ptr const & vote, nano::vote_source source, std::unordered_map const & results) { - if (source != nano::vote_source::cache) - { - vote_cache.insert (vote, results); - } - }); - // Republish vote if it is new and the node does not host a principal representative (or close to) vote_router.vote_processed.add ([this] (std::shared_ptr const & vote, nano::vote_source source, std::unordered_map const & results) { bool processed = std::any_of (results.begin (), results.end (), [] (auto const & result) { diff --git a/nano/node/vote_router.cpp b/nano/node/vote_router.cpp index 426dc69a4..f7b191f1c 100644 --- a/nano/node/vote_router.cpp +++ b/nano/node/vote_router.cpp @@ -20,9 +20,9 @@ nano::stat::detail nano::to_stat_detail (nano::vote_source source) return nano::enum_util::cast (source); } -nano::vote_router::vote_router (nano::vote_cache & cache, nano::recently_confirmed_cache & recently_confirmed) : - cache{ cache }, - recently_confirmed{ recently_confirmed } +nano::vote_router::vote_router (nano::vote_cache & vote_cache_a, nano::recently_confirmed_cache & recently_confirmed_a) : + vote_cache{ vote_cache_a }, + recently_confirmed{ recently_confirmed_a } { } @@ -81,12 +81,12 @@ std::unordered_map nano::vote_router::vote (s continue; } - auto find_election = [this] (auto const & hash) { + auto find_election = [this] (auto const & hash) -> std::shared_ptr { if (auto existing = elections.find (hash); existing != elections.end ()) { return existing->second.lock (); } - return std::shared_ptr{}; + return {}; }; if (auto election = find_election (hash)) @@ -118,6 +118,12 @@ std::unordered_map nano::vote_router::vote (s return results.find (hash) != results.end (); })); + // Cache the votes that didn't match any election + if (source != nano::vote_source::cache) + { + vote_cache.insert (vote, results); + } + vote_processed.notify (vote, source, results); return results; diff --git a/nano/node/vote_router.hpp b/nano/node/vote_router.hpp index 0468bf3d3..f2afb2bf5 100644 --- a/nano/node/vote_router.hpp +++ b/nano/node/vote_router.hpp @@ -70,14 +70,18 @@ public: std::unique_ptr collect_container_info (std::string const & name) const; +private: // Dependencies + nano::vote_cache & vote_cache; + nano::recently_confirmed_cache & recently_confirmed; + private: void run (); - nano::vote_cache & cache; - nano::recently_confirmed_cache & recently_confirmed; +private: // Mapping of block hashes to elections. // Election already contains the associated block std::unordered_map> elections; + bool stopped{ false }; std::condition_variable_any condition; mutable std::shared_mutex mutex; From 221428ba1b9b6916ed170cbdb6e07206044118ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 24 Sep 2024 19:19:38 +0200 Subject: [PATCH 10/15] Add epochs to `network_filter` --- nano/core_test/network_filter.cpp | 24 +++++++++++++++ nano/lib/network_filter.cpp | 49 ++++++++++++++++++++----------- nano/lib/network_filter.hpp | 48 ++++++++++++++++++++---------- 3 files changed, 89 insertions(+), 32 deletions(-) diff --git a/nano/core_test/network_filter.cpp b/nano/core_test/network_filter.cpp index d55b0dca2..0a713a187 100644 --- a/nano/core_test/network_filter.cpp +++ b/nano/core_test/network_filter.cpp @@ -140,3 +140,27 @@ TEST (network_filter, optional_digest) filter.clear (digest); ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ())); } + +TEST (network_filter, expire) +{ + // Expire entries older than 2 epochs + nano::network_filter filter{ 4, 2 }; + + ASSERT_FALSE (filter.apply (1)); // Entry with epoch 0 + filter.update (); // Bump epoch to 1 + ASSERT_FALSE (filter.apply (2)); // Entry with epoch 1 + + // Both values should be detected as present + ASSERT_TRUE (filter.check (1)); + ASSERT_TRUE (filter.check (2)); + + filter.update (2); // Bump epoch to 3 + + ASSERT_FALSE (filter.check (1)); // Entry with epoch 0 should be expired + ASSERT_TRUE (filter.check (2)); // Entry with epoch 1 should still be present + + filter.update (); // Bump epoch to 4 + + ASSERT_FALSE (filter.check (2)); // Entry with epoch 1 should be expired + ASSERT_FALSE (filter.apply (2)); // Entry with epoch 1 should be replaced +} \ No newline at end of file diff --git a/nano/lib/network_filter.cpp b/nano/lib/network_filter.cpp index dc9931890..237746971 100644 --- a/nano/lib/network_filter.cpp +++ b/nano/lib/network_filter.cpp @@ -5,12 +5,27 @@ #include #include -nano::network_filter::network_filter (size_t size_a) : - items (size_a, nano::uint128_t{ 0 }) +nano::network_filter::network_filter (size_t size_a, epoch_t age_cutoff_a) : + items (size_a, { 0 }), + age_cutoff{ age_cutoff_a } { nano::random_pool::generate_block (key, key.size ()); } +void nano::network_filter::update (epoch_t epoch_inc) +{ + debug_assert (epoch_inc > 0); + nano::lock_guard lock{ mutex }; + current_epoch += epoch_inc; +} + +bool nano::network_filter::compare (entry const & existing, digest_t const & digest) const +{ + debug_assert (!mutex.try_lock ()); + // Only consider digests to be the same if the epoch is within the age cutoff + return existing.digest == digest && existing.epoch + age_cutoff >= current_epoch; +} + bool nano::network_filter::apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_out) { // Get hash before locking @@ -27,11 +42,11 @@ bool nano::network_filter::apply (digest_t const & digest) nano::lock_guard lock{ mutex }; auto & element = get_element (digest); - bool existed = (element == digest); + bool existed = compare (element, digest); if (!existed) { // Replace likely old element with a new one - element = digest; + element = { digest, current_epoch }; } return existed; } @@ -45,28 +60,28 @@ bool nano::network_filter::check (digest_t const & digest) const { nano::lock_guard lock{ mutex }; auto & element = get_element (digest); - return element == digest; + return compare (element, digest); } -void nano::network_filter::clear (nano::uint128_t const & digest_a) +void nano::network_filter::clear (digest_t const & digest) { nano::lock_guard lock{ mutex }; - auto & element (get_element (digest_a)); - if (element == digest_a) + auto & element = get_element (digest); + if (compare (element, digest)) { - element = nano::uint128_t{ 0 }; + element = { 0 }; } } -void nano::network_filter::clear (std::vector const & digests_a) +void nano::network_filter::clear (std::vector const & digests) { nano::lock_guard lock{ mutex }; - for (auto const & digest : digests_a) + for (auto const & digest : digests) { - auto & element (get_element (digest)); - if (element == digest) + auto & element = get_element (digest); + if (compare (element, digest)) { - element = nano::uint128_t{ 0 }; + element = { 0 }; } } } @@ -85,7 +100,7 @@ void nano::network_filter::clear (OBJECT const & object_a) void nano::network_filter::clear () { nano::lock_guard lock{ mutex }; - items.assign (items.size (), nano::uint128_t{ 0 }); + items.assign (items.size (), { 0 }); } template @@ -99,7 +114,7 @@ nano::uint128_t nano::network_filter::hash (OBJECT const & object_a) const return hash (bytes.data (), bytes.size ()); } -nano::uint128_t & nano::network_filter::get_element (nano::uint128_t const & hash_a) +auto nano::network_filter::get_element (nano::uint128_t const & hash_a) -> entry & { debug_assert (!mutex.try_lock ()); debug_assert (items.size () > 0); @@ -107,7 +122,7 @@ nano::uint128_t & nano::network_filter::get_element (nano::uint128_t const & has return items[index]; } -nano::uint128_t const & nano::network_filter::get_element (nano::uint128_t const & hash_a) const +auto nano::network_filter::get_element (nano::uint128_t const & hash_a) const -> entry const & { debug_assert (!mutex.try_lock ()); debug_assert (items.size () > 0); diff --git a/nano/lib/network_filter.hpp b/nano/lib/network_filter.hpp index 77a060f27..c9cca1030 100644 --- a/nano/lib/network_filter.hpp +++ b/nano/lib/network_filter.hpp @@ -18,9 +18,16 @@ class network_filter final { public: using digest_t = nano::uint128_t; + using epoch_t = uint64_t; public: - explicit network_filter (size_t size); + explicit network_filter (size_t size, epoch_t age_cutoff = 0); + + /** + * Updates the filter to the next epoch. + * Should be called periodically to time out old entries. + */ + void update (epoch_t epoch_inc = 1); /** * Reads \p count_a bytes starting from \p bytes_a and inserts the siphash digest in the filter. @@ -68,29 +75,40 @@ public: * Serializes \p object_a and returns the resulting siphash digest */ template - nano::uint128_t hash (OBJECT const & object) const; + digest_t hash (OBJECT const & object) const; + + /** + * Hashes \p count_a bytes starting from \p bytes_a . + * @return the siphash digest of the contents in \p bytes_a . + **/ + digest_t hash (uint8_t const * bytes, size_t count) const; private: + epoch_t const age_cutoff; + epoch_t current_epoch{ 0 }; + using siphash_t = CryptoPP::SipHash<2, 4, true>; + CryptoPP::SecByteBlock key{ siphash_t::KEYLENGTH }; + + mutable nano::mutex mutex{ mutex_identifier (mutexes::network_filter) }; + +private: + struct entry + { + digest_t digest; + epoch_t epoch; + }; + + std::vector items; /** * Get element from digest. * @note must have a lock on mutex * @return a reference to the element with key \p hash_a **/ - nano::uint128_t & get_element (digest_t const & hash); - nano::uint128_t const & get_element (digest_t const & hash) const; + entry & get_element (digest_t const & hash); + entry const & get_element (digest_t const & hash) const; - /** - * Hashes \p count_a bytes starting from \p bytes_a . - * @return the siphash digest of the contents in \p bytes_a . - **/ - nano::uint128_t hash (uint8_t const * bytes, size_t count) const; - -private: - std::vector items; - CryptoPP::SecByteBlock key{ siphash_t::KEYLENGTH }; - - mutable nano::mutex mutex{ mutex_identifier (mutexes::network_filter) }; + bool compare (entry const & existing, digest_t const & digest) const; }; } From eef0fca53ed1c2733fc7fcd4c3b5c3b5cf94ece1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 24 Sep 2024 19:20:40 +0200 Subject: [PATCH 11/15] Periodically update network publish filter --- nano/node/network.cpp | 8 ++++++-- nano/node/network.hpp | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 0bb6d8218..df4932aa3 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -25,7 +25,7 @@ nano::network::network (nano::node & node, uint16_t port) : id{ nano::network_constants::active_network }, syn_cookies{ node.config.network.max_peers_per_ip, node.logger }, resolver{ node.io_ctx }, - publish_filter{ node.config.network.duplicate_filter_size }, + publish_filter{ node.config.network.duplicate_filter_size, node.config.network.duplicate_filter_cutoff }, tcp_channels{ node }, port{ port } { @@ -97,7 +97,9 @@ void nano::network::run_cleanup () nano::unique_lock lock{ mutex }; while (!stopped) { - condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s); + std::chrono::seconds const interval = node.network_params.network.is_dev_network () ? 1s : 5s; + + condition.wait_for (lock, interval); if (stopped) { return; @@ -115,6 +117,8 @@ void nano::network::run_cleanup () auto const syn_cookie_cutoff = std::chrono::steady_clock::now () - node.network_params.network.syn_cookie_cutoff; syn_cookies.purge (syn_cookie_cutoff); + publish_filter.update (interval.count ()); + lock.lock (); } } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index ca9512e61..bac8f5c16 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -79,6 +79,7 @@ public: size_t max_peers_per_subnetwork{ 16 }; size_t duplicate_filter_size{ 1024 * 1024 }; + uint64_t duplicate_filter_cutoff{ 60 }; }; class network final From 98aaa57d9a47035fff52539970279358d4ecd0da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 24 Sep 2024 19:31:13 +0200 Subject: [PATCH 12/15] Allow disabling vote processor --- nano/node/vote_processor.cpp | 5 +++++ nano/node/vote_processor.hpp | 2 ++ 2 files changed, 7 insertions(+) diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index 7b480e07b..4d619db96 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -71,6 +71,11 @@ void nano::vote_processor::start () { debug_assert (threads.empty ()); + if (!config.enable) + { + return; + } + for (int n = 0; n < config.threads; ++n) { threads.emplace_back ([this] () { diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 2a22f65ee..785762213 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -23,6 +23,8 @@ public: nano::error deserialize (nano::tomlconfig & toml); public: + bool enable{ true }; + size_t max_pr_queue{ 256 }; size_t max_non_pr_queue{ 32 }; size_t pr_priority{ 3 }; From 49cf520bfdd82486ad05e57586c920bf1133aba3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 24 Sep 2024 19:48:55 +0200 Subject: [PATCH 13/15] Tests #2 --- nano/core_test/network.cpp | 48 +++++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index d1467472b..94d228327 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -686,6 +686,21 @@ TEST (network, peer_max_tcp_attempts_subnetwork) ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::max_per_subnetwork, nano::stat::dir::out)); } +namespace +{ +// Skip the first 8 bytes of the message header, which is the common header for all messages +std::vector message_payload_to_bytes (nano::message const & message) +{ + std::vector bytes; + { + nano::vectorstream stream (bytes); + message.serialize (stream); + } + debug_assert (bytes.size () > nano::message_header::size); + return std::vector (bytes.begin () + nano::message_header::size, bytes.end ()); +} +} + // Send two publish messages and asserts that the duplication is detected. TEST (network, duplicate_detection) { @@ -715,11 +730,7 @@ TEST (network, duplicate_revert_publish) node_config.block_processor.max_peer_queue = 0; auto & node (*system.add_node (node_config)); nano::publish publish{ nano::dev::network_params.network, nano::dev::genesis }; - std::vector bytes; - { - nano::vectorstream stream (bytes); - publish.block->serialize (stream); - } + std::vector bytes = message_payload_to_bytes (publish); // Add to the blocks filter // Should be cleared when dropping due to a full block processor, as long as the message has the optional digest attached // Test network.duplicate_detection ensures that the digest is attached when deserializing messages @@ -767,14 +778,35 @@ TEST (network, duplicate_revert_vote) { nano::test::system system; nano::node_config node_config = system.default_config (); - node_config.vote_processor.max_non_pr_queue = 0; + node_config.vote_processor.enable = false; // Do not drain queued votes + node_config.vote_processor.max_non_pr_queue = 1; + node_config.vote_processor.max_pr_queue = 1; auto & node0 = *system.add_node (node_config); auto & node1 = *system.add_node (node_config); - auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }); - nano::confirm_ack message{ nano::dev::network_params.network, vote }; + auto vote1 = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }, 1); + nano::confirm_ack message1{ nano::dev::network_params.network, vote1 }; + auto bytes1 = message_payload_to_bytes (message1); + auto vote2 = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }, 2); + nano::confirm_ack message2{ nano::dev::network_params.network, vote2 }; + auto bytes2 = message_payload_to_bytes (message2); + // Publish duplicate detection through TCP + auto tcp_channel = node0.network.tcp_channels.find_node_id (node1.get_node_id ()); + ASSERT_NE (nullptr, tcp_channel); + + // First vote should be processed + tcp_channel->send (message1); + ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); + ASSERT_TIMELY (5s, node1.network.publish_filter.check (bytes1.data (), bytes1.size ())); + + // Second vote should get dropped from processor queue + tcp_channel->send (message2); + ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); + // And the filter should not have it + WAIT (500ms); // Give the node time to process the vote + ASSERT_TIMELY (5s, !node1.network.publish_filter.check (bytes2.data (), bytes2.size ())); } // The test must be completed in less than 1 second From db90344f36cc50cc18c4d21525b3d6f83b81ea21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 24 Sep 2024 19:55:42 +0200 Subject: [PATCH 14/15] Tests #3 --- nano/core_test/network.cpp | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 94d228327..3ac09219d 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -753,9 +753,8 @@ TEST (network, duplicate_revert_publish) TEST (network, duplicate_vote_detection) { nano::test::system system; - nano::node_flags node_flags; - auto & node0 = *system.add_node (node_flags); - auto & node1 = *system.add_node (node_flags); + auto & node0 = *system.add_node (); + auto & node1 = *system.add_node (); auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }); nano::confirm_ack message{ nano::dev::network_params.network, vote }; @@ -809,6 +808,34 @@ TEST (network, duplicate_revert_vote) ASSERT_TIMELY (5s, !node1.network.publish_filter.check (bytes2.data (), bytes2.size ())); } +TEST (network, expire_duplicate_filter) +{ + nano::test::system system; + nano::node_config node_config = system.default_config (); + node_config.network.duplicate_filter_cutoff = 3; // Expire after 3 seconds + auto & node0 = *system.add_node (node_config); + auto & node1 = *system.add_node (node_config); + + auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }); + nano::confirm_ack message{ nano::dev::network_params.network, vote }; + auto bytes = message_payload_to_bytes (message); + + // Publish duplicate detection through TCP + auto tcp_channel = node0.network.tcp_channels.find_node_id (node1.get_node_id ()); + ASSERT_NE (nullptr, tcp_channel); + + // Send a vote + ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message)); + tcp_channel->send (message); + ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); + tcp_channel->send (message); + ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1); + + // The filter should expire the vote after some time + ASSERT_TRUE (node1.network.publish_filter.check (bytes.data (), bytes.size ())); + ASSERT_TIMELY (10s, !node1.network.publish_filter.check (bytes.data (), bytes.size ())); +} + // The test must be completed in less than 1 second TEST (network, bandwidth_limiter_4_messages) { From c9ee5365e3935db30c99b58da86c9a2568fc2301 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Tue, 24 Sep 2024 20:06:36 +0200 Subject: [PATCH 15/15] Rename `network.publish_filter` to `network.filter` --- nano/core_test/active_elections.cpp | 18 +++++++++--------- nano/core_test/network.cpp | 16 ++++++++-------- nano/core_test/node.cpp | 2 +- nano/fuzzer_test/fuzz_buffer.cpp | 2 +- nano/node/active_elections.cpp | 2 +- nano/node/election.cpp | 4 ++-- nano/node/message_processor.cpp | 4 ++-- nano/node/network.cpp | 4 ++-- nano/node/network.hpp | 2 +- nano/node/transport/inproc.cpp | 2 +- nano/node/transport/tcp_server.cpp | 2 +- 11 files changed, 29 insertions(+), 29 deletions(-) diff --git a/nano/core_test/active_elections.cpp b/nano/core_test/active_elections.cpp index aabdce1c4..38c599df1 100644 --- a/nano/core_test/active_elections.cpp +++ b/nano/core_test/active_elections.cpp @@ -649,14 +649,14 @@ TEST (active_elections, dropped_cleanup) nano::vectorstream stream (block_bytes); chain[0]->serialize (stream); } - ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); - ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); + ASSERT_FALSE (node.network.filter.apply (block_bytes.data (), block_bytes.size ())); + ASSERT_TRUE (node.network.filter.apply (block_bytes.data (), block_bytes.size ())); auto election = nano::test::start_election (system, node, hash); ASSERT_NE (nullptr, election); // Not yet removed - ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); + ASSERT_TRUE (node.network.filter.apply (block_bytes.data (), block_bytes.size ())); ASSERT_TRUE (node.vote_router.active (hash)); // Now simulate dropping the election @@ -664,7 +664,7 @@ TEST (active_elections, dropped_cleanup) node.active.erase (*chain[0]); // The filter must have been cleared - ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); + ASSERT_FALSE (node.network.filter.apply (block_bytes.data (), block_bytes.size ())); // An election was recently dropped ASSERT_EQ (1, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::manual)); @@ -673,7 +673,7 @@ TEST (active_elections, dropped_cleanup) ASSERT_FALSE (node.vote_router.active (hash)); // Repeat test for a confirmed election - ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); + ASSERT_TRUE (node.network.filter.apply (block_bytes.data (), block_bytes.size ())); election = nano::test::start_election (system, node, hash); ASSERT_NE (nullptr, election); @@ -682,7 +682,7 @@ TEST (active_elections, dropped_cleanup) node.active.erase (*chain[0]); // The filter should not have been cleared - ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); + ASSERT_TRUE (node.network.filter.apply (block_bytes.data (), block_bytes.size ())); // Not dropped ASSERT_EQ (1, node.stats.count (nano::stat::type::active_elections_dropped, nano::stat::detail::manual)); @@ -825,7 +825,7 @@ TEST (active_elections, fork_filter_cleanup) ASSERT_TIMELY_EQ (5s, node1.ledger.block_count (), 2); // Block is erased from the duplicate filter - ASSERT_TIMELY (5s, node1.network.publish_filter.apply (send_block_bytes.data (), send_block_bytes.size ())); + ASSERT_TIMELY (5s, node1.network.filter.apply (send_block_bytes.data (), send_block_bytes.size ())); } /* @@ -960,7 +960,7 @@ TEST (active_elections, fork_replacement_tally) // Process correct block node_config.peering_port = system.get_available_port (); auto & node2 (*system.add_node (node_config)); - node1.network.publish_filter.clear (); + node1.network.filter.clear (); node2.network.flood_block (send_last); ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 0); @@ -974,7 +974,7 @@ TEST (active_elections, fork_replacement_tally) node1.vote_processor.vote (vote, std::make_shared (node1, node1)); // ensure vote arrives before the block ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ()); - node1.network.publish_filter.clear (); + node1.network.filter.clear (); node2.network.flood_block (send_last); ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 3ac09219d..eb4713f94 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -735,8 +735,8 @@ TEST (network, duplicate_revert_publish) // Should be cleared when dropping due to a full block processor, as long as the message has the optional digest attached // Test network.duplicate_detection ensures that the digest is attached when deserializing messages nano::uint128_t digest; - ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size (), &digest)); - ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); + ASSERT_FALSE (node.network.filter.apply (bytes.data (), bytes.size (), &digest)); + ASSERT_TRUE (node.network.filter.apply (bytes.data (), bytes.size ())); auto other_node (std::make_shared (system.io_ctx, system.get_available_port (), nano::unique_path (), system.work)); other_node->start (); system.nodes.push_back (other_node); @@ -744,10 +744,10 @@ TEST (network, duplicate_revert_publish) ASSERT_NE (nullptr, channel); ASSERT_EQ (0, publish.digest); node.network.inbound (publish, channel); - ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); + ASSERT_TRUE (node.network.filter.apply (bytes.data (), bytes.size ())); publish.digest = digest; node.network.inbound (publish, channel); - ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); + ASSERT_FALSE (node.network.filter.apply (bytes.data (), bytes.size ())); } TEST (network, duplicate_vote_detection) @@ -798,14 +798,14 @@ TEST (network, duplicate_revert_vote) // First vote should be processed tcp_channel->send (message1); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); - ASSERT_TIMELY (5s, node1.network.publish_filter.check (bytes1.data (), bytes1.size ())); + ASSERT_TIMELY (5s, node1.network.filter.check (bytes1.data (), bytes1.size ())); // Second vote should get dropped from processor queue tcp_channel->send (message2); ASSERT_ALWAYS_EQ (100ms, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 0); // And the filter should not have it WAIT (500ms); // Give the node time to process the vote - ASSERT_TIMELY (5s, !node1.network.publish_filter.check (bytes2.data (), bytes2.size ())); + ASSERT_TIMELY (5s, !node1.network.filter.check (bytes2.data (), bytes2.size ())); } TEST (network, expire_duplicate_filter) @@ -832,8 +832,8 @@ TEST (network, expire_duplicate_filter) ASSERT_TIMELY_EQ (2s, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message), 1); // The filter should expire the vote after some time - ASSERT_TRUE (node1.network.publish_filter.check (bytes.data (), bytes.size ())); - ASSERT_TIMELY (10s, !node1.network.publish_filter.check (bytes.data (), bytes.size ())); + ASSERT_TRUE (node1.network.filter.check (bytes.data (), bytes.size ())); + ASSERT_TIMELY (10s, !node1.network.filter.check (bytes.data (), bytes.size ())); } // The test must be completed in less than 1 second diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 7a3ea364b..7a8f1df34 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2220,7 +2220,7 @@ TEST (node, vote_by_hash_republish) ASSERT_TIMELY (5s, node2.active.active (*send1)); // give block send2 to node1 and wait until the block is received and processed by node1 - node1.network.publish_filter.clear (); + node1.network.filter.clear (); node1.process_active (send2); ASSERT_TIMELY (5s, node1.active.active (*send2)); diff --git a/nano/fuzzer_test/fuzz_buffer.cpp b/nano/fuzzer_test/fuzz_buffer.cpp index 4be224028..9f584395a 100644 --- a/nano/fuzzer_test/fuzz_buffer.cpp +++ b/nano/fuzzer_test/fuzz_buffer.cpp @@ -68,7 +68,7 @@ void fuzz_message_parser (uint8_t const * Data, size_t Size) } fuzz_visitor visitor; - nano::message_parser parser (node0->network.publish_filter, node0->block_uniquer, node0->vote_uniquer, visitor, node0->work); + nano::message_parser parser (node0->network.filter, node0->block_uniquer, node0->vote_uniquer, visitor, node0->work); parser.deserialize_buffer (Data, Size); } diff --git a/nano/node/active_elections.cpp b/nano/node/active_elections.cpp index 7c4aadc63..814cf088a 100644 --- a/nano/node/active_elections.cpp +++ b/nano/node/active_elections.cpp @@ -363,7 +363,7 @@ void nano::active_elections::cleanup_election (nano::unique_lock & if (!election->confirmed ()) { // Clear from publish filter - node.network.publish_filter.clear (block); + node.network.filter.clear (block); } } } diff --git a/nano/node/election.cpp b/nano/node/election.cpp index c3845fc23..98fa9ba49 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -518,7 +518,7 @@ bool nano::election::publish (std::shared_ptr const & block_a) if (!replace_by_weight (lock, block_a->hash ())) { result = true; - node.network.publish_filter.clear (block_a); + node.network.filter.clear (block_a); } debug_assert (lock.owns_lock ()); } @@ -643,7 +643,7 @@ void nano::election::remove_block (nano::block_hash const & hash_a) return entry.second.hash == hash_a; }); - node.network.publish_filter.clear (existing->second); + node.network.filter.clear (existing->second); last_blocks.erase (hash_a); } } diff --git a/nano/node/message_processor.cpp b/nano/node/message_processor.cpp index cd400daa1..f43126b4d 100644 --- a/nano/node/message_processor.cpp +++ b/nano/node/message_processor.cpp @@ -190,7 +190,7 @@ public: bool added = node.block_processor.add (message.block, message.is_originator () ? nano::block_source::live_originator : nano::block_source::live, channel); if (!added) { - node.network.publish_filter.clear (message.digest); + node.network.filter.clear (message.digest); node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in); } } @@ -220,7 +220,7 @@ public: bool added = node.vote_processor.vote (message.vote, channel, message.is_rebroadcasted () ? nano::vote_source::rebroadcast : nano::vote_source::live); if (!added) { - node.network.publish_filter.clear (message.digest); + node.network.filter.clear (message.digest); node.stats.inc (nano::stat::type::drop, nano::stat::detail::confirm_ack, nano::stat::dir::in); } } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index df4932aa3..5073cf35e 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -25,7 +25,7 @@ nano::network::network (nano::node & node, uint16_t port) : id{ nano::network_constants::active_network }, syn_cookies{ node.config.network.max_peers_per_ip, node.logger }, resolver{ node.io_ctx }, - publish_filter{ node.config.network.duplicate_filter_size, node.config.network.duplicate_filter_cutoff }, + filter{ node.config.network.duplicate_filter_size, node.config.network.duplicate_filter_cutoff }, tcp_channels{ node }, port{ port } { @@ -117,7 +117,7 @@ void nano::network::run_cleanup () auto const syn_cookie_cutoff = std::chrono::steady_clock::now () - node.network_params.network.syn_cookie_cutoff; syn_cookies.purge (syn_cookie_cutoff); - publish_filter.update (interval.count ()); + filter.update (interval.count ()); lock.lock (); } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index bac8f5c16..e33cbd5f0 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -151,7 +151,7 @@ public: nano::syn_cookies syn_cookies; boost::asio::ip::tcp::resolver resolver; nano::peer_exclusion excluded_peers; - nano::network_filter publish_filter; + nano::network_filter filter; nano::transport::tcp_channels tcp_channels; std::atomic port{ 0 }; diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index e1c8383f0..c1cf7a5b9 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -30,7 +30,7 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size_a); }; - auto const message_deserializer = std::make_shared (node.network_params.network, node.network.publish_filter, node.block_uniquer, node.vote_uniquer, buffer_read_fn); + auto const message_deserializer = std::make_shared (node.network_params.network, node.network.filter, node.block_uniquer, node.vote_uniquer, buffer_read_fn); message_deserializer->read ( [this] (boost::system::error_code ec_a, std::unique_ptr message_a) { if (ec_a || !message_a) diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index f7075faa3..a4e59e674 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -17,7 +17,7 @@ nano::transport::tcp_server::tcp_server (std::shared_ptr (node_a->network_params.network, node_a->network.publish_filter, node_a->block_uniquer, node_a->vote_uniquer, + std::make_shared (node_a->network_params.network, node_a->network.filter, node_a->block_uniquer, node_a->vote_uniquer, [socket_l = socket] (std::shared_ptr> const & data_a, size_t size_a, std::function callback_a) { debug_assert (socket_l != nullptr); socket_l->read_impl (data_a, size_a, callback_a);