Probabilistic network packet filter (#2543)
* Extract stream.hpp from blocks.hpp * Add a probabilistic filter based on direct mapped caches, keyed by 128-bit SipHash. Co-Authored-By: Colin LeMahieu <clemahieu@gmail.com> The filter receives an array of bytes representing a network packet, and is checked for duplicity of the packet. The probability of a false duplicate is marginal, but not zero, and decreases with the size of the filter. The probability of a false non-duplicate is the infinitesimal probability of a 128-bit SipHash collision. Items are not normally erased. Instead, if a new item is different from the one at the insertion index (digest % capacity), the old item is replaced. There is also a function to erase an element from the filter, if the digest matches it. The filter state is protected by a mutex, whereas hashing is performed while not holding it. Uses 1MB of memory for every 64k elements. * Remove explicit instantiations * Remove alias * Optionally set digest in ::apply and add ::clear method directly from a digest
This commit is contained in:
parent
f28580d482
commit
dc3ad45d75
9 changed files with 308 additions and 31 deletions
|
@ -23,6 +23,7 @@ add_executable (core_test
|
|||
message_parser.cpp
|
||||
memory_pool.cpp
|
||||
network.cpp
|
||||
network_filter.cpp
|
||||
node.cpp
|
||||
node_telemetry.cpp
|
||||
processor_service.cpp
|
||||
|
|
111
nano/core_test/network_filter.cpp
Normal file
111
nano/core_test/network_filter.cpp
Normal file
|
@ -0,0 +1,111 @@
|
|||
#include <nano/core_test/testutil.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/secure/buffer.hpp>
|
||||
#include <nano/secure/common.hpp>
|
||||
#include <nano/secure/network_filter.hpp>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
TEST (network_filter, unit)
|
||||
{
|
||||
nano::genesis genesis;
|
||||
nano::network_filter filter (1);
|
||||
auto one_block = [&genesis, &filter](std::shared_ptr<nano::block> const & block_a, bool expect_duplicate_a) {
|
||||
nano::publish message (block_a);
|
||||
auto bytes (message.to_bytes ());
|
||||
nano::bufferstream stream (bytes->data (), bytes->size ());
|
||||
|
||||
// First read the header
|
||||
bool error{ false };
|
||||
nano::message_header header (error, stream);
|
||||
ASSERT_FALSE (error);
|
||||
|
||||
// This validates nano::message_header::size
|
||||
ASSERT_EQ (bytes->size (), block_a->size (block_a->type ()) + header.size);
|
||||
|
||||
// Now filter the rest of the stream
|
||||
bool duplicate (filter.apply (bytes->data (), bytes->size () - header.size));
|
||||
ASSERT_EQ (expect_duplicate_a, duplicate);
|
||||
|
||||
// Make sure the stream was rewinded correctly
|
||||
auto block (nano::deserialize_block (stream, header.block_type ()));
|
||||
ASSERT_NE (nullptr, block);
|
||||
ASSERT_EQ (*block, *block_a);
|
||||
};
|
||||
one_block (genesis.open, false);
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
one_block (genesis.open, true);
|
||||
}
|
||||
auto new_block (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.open->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 10 * nano::xrb_ratio, nano::public_key (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
|
||||
one_block (new_block, false);
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
one_block (new_block, true);
|
||||
}
|
||||
for (int i = 0; i < 100; ++i)
|
||||
{
|
||||
one_block (genesis.open, false);
|
||||
one_block (new_block, false);
|
||||
}
|
||||
}
|
||||
|
||||
TEST (network_filter, many)
|
||||
{
|
||||
nano::genesis genesis;
|
||||
nano::network_filter filter (4);
|
||||
nano::keypair key1;
|
||||
for (int i = 0; i < 100; ++i)
|
||||
{
|
||||
auto block (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.open->hash (), nano::test_genesis_key.pub, nano::genesis_amount - i * 10 * nano::xrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
|
||||
|
||||
nano::publish message (block);
|
||||
auto bytes (message.to_bytes ());
|
||||
nano::bufferstream stream (bytes->data (), bytes->size ());
|
||||
|
||||
// First read the header
|
||||
bool error{ false };
|
||||
nano::message_header header (error, stream);
|
||||
ASSERT_FALSE (error);
|
||||
|
||||
// This validates nano::message_header::size
|
||||
ASSERT_EQ (bytes->size (), block->size + header.size);
|
||||
|
||||
// Now filter the rest of the stream
|
||||
// All blocks should pass through
|
||||
ASSERT_FALSE (filter.apply (bytes->data (), block->size));
|
||||
ASSERT_FALSE (error);
|
||||
|
||||
// Make sure the stream was rewinded correctly
|
||||
auto deserialized_block (nano::deserialize_block (stream, header.block_type ()));
|
||||
ASSERT_NE (nullptr, deserialized_block);
|
||||
ASSERT_EQ (*block, *deserialized_block);
|
||||
}
|
||||
}
|
||||
|
||||
TEST (network_filter, clear)
|
||||
{
|
||||
nano::network_filter filter (1);
|
||||
std::vector<uint8_t> bytes1{ 1, 2, 3 };
|
||||
std::vector<uint8_t> bytes2{ 1 };
|
||||
ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ()));
|
||||
ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ()));
|
||||
filter.clear (bytes1.data (), bytes1.size ());
|
||||
ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ()));
|
||||
ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ()));
|
||||
filter.clear (bytes2.data (), bytes2.size ());
|
||||
ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ()));
|
||||
ASSERT_FALSE (filter.apply (bytes2.data (), bytes2.size ()));
|
||||
}
|
||||
|
||||
TEST (network_filter, optional_digest)
|
||||
{
|
||||
nano::network_filter filter (1);
|
||||
std::vector<uint8_t> bytes1{ 1, 2, 3 };
|
||||
nano::uint128_t digest{ 0 };
|
||||
ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size (), &digest));
|
||||
ASSERT_NE (0, digest);
|
||||
ASSERT_TRUE (filter.apply (bytes1.data (), bytes1.size ()));
|
||||
filter.clear (digest);
|
||||
ASSERT_FALSE (filter.apply (bytes1.data (), bytes1.size ()));
|
||||
}
|
|
@ -50,6 +50,7 @@ add_library (nano_lib
|
|||
rpcconfig.cpp
|
||||
stats.hpp
|
||||
stats.cpp
|
||||
stream.hpp
|
||||
threading.hpp
|
||||
threading.cpp
|
||||
timer.hpp
|
||||
|
|
|
@ -3,45 +3,15 @@
|
|||
#include <nano/crypto/blake2/blake2.h>
|
||||
#include <nano/lib/errors.hpp>
|
||||
#include <nano/lib/numbers.hpp>
|
||||
#include <nano/lib/stream.hpp>
|
||||
#include <nano/lib/utility.hpp>
|
||||
|
||||
#include <boost/property_tree/ptree_fwd.hpp>
|
||||
|
||||
#include <cassert>
|
||||
#include <streambuf>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
// We operate on streams of uint8_t by convention
|
||||
using stream = std::basic_streambuf<uint8_t>;
|
||||
// Read a raw byte stream the size of `T' and fill value. Returns true if there was an error, false otherwise
|
||||
template <typename T>
|
||||
bool try_read (nano::stream & stream_a, T & value)
|
||||
{
|
||||
static_assert (std::is_standard_layout<T>::value, "Can't stream read non-standard layout types");
|
||||
auto amount_read (stream_a.sgetn (reinterpret_cast<uint8_t *> (&value), sizeof (value)));
|
||||
return amount_read != sizeof (value);
|
||||
}
|
||||
// A wrapper of try_read which throws if there is an error
|
||||
template <typename T>
|
||||
void read (nano::stream & stream_a, T & value)
|
||||
{
|
||||
auto error = try_read (stream_a, value);
|
||||
if (error)
|
||||
{
|
||||
throw std::runtime_error ("Failed to read type");
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void write (nano::stream & stream_a, T const & value)
|
||||
{
|
||||
static_assert (std::is_standard_layout<T>::value, "Can't stream write non-standard layout types");
|
||||
auto amount_written (stream_a.sputn (reinterpret_cast<uint8_t const *> (&value), sizeof (value)));
|
||||
(void)amount_written;
|
||||
assert (amount_written == sizeof (value));
|
||||
}
|
||||
class block_visitor;
|
||||
enum class block_type : uint8_t
|
||||
{
|
||||
|
|
38
nano/lib/stream.hpp
Normal file
38
nano/lib/stream.hpp
Normal file
|
@ -0,0 +1,38 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <cassert>
|
||||
#include <streambuf>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
// We operate on streams of uint8_t by convention
|
||||
using stream = std::basic_streambuf<uint8_t>;
|
||||
// Read a raw byte stream the size of `T' and fill value. Returns true if there was an error, false otherwise
|
||||
template <typename T>
|
||||
bool try_read (nano::stream & stream_a, T & value)
|
||||
{
|
||||
static_assert (std::is_standard_layout<T>::value, "Can't stream read non-standard layout types");
|
||||
auto amount_read (stream_a.sgetn (reinterpret_cast<uint8_t *> (&value), sizeof (value)));
|
||||
return amount_read != sizeof (value);
|
||||
}
|
||||
// A wrapper of try_read which throws if there is an error
|
||||
template <typename T>
|
||||
void read (nano::stream & stream_a, T & value)
|
||||
{
|
||||
auto error = try_read (stream_a, value);
|
||||
if (error)
|
||||
{
|
||||
throw std::runtime_error ("Failed to read type");
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void write (nano::stream & stream_a, T const & value)
|
||||
{
|
||||
static_assert (std::is_standard_layout<T>::value, "Can't stream write non-standard layout types");
|
||||
auto amount_written (stream_a.sputn (reinterpret_cast<uint8_t const *> (&value), sizeof (value)));
|
||||
(void)amount_written;
|
||||
assert (amount_written == sizeof (value));
|
||||
}
|
||||
}
|
|
@ -199,6 +199,7 @@ public:
|
|||
uint8_t version_min;
|
||||
nano::message_type type;
|
||||
std::bitset<16> extensions;
|
||||
static size_t constexpr size = sizeof (network_params::header_magic_number) + sizeof (version_max) + sizeof (version_using) + sizeof (version_min) + sizeof (type) + sizeof (/* extensions */ uint16_t);
|
||||
|
||||
void flag_set (uint8_t);
|
||||
static uint8_t constexpr bulk_pull_count_present_flag = 0;
|
||||
|
|
|
@ -45,6 +45,8 @@ add_library (secure
|
|||
epoch.cpp
|
||||
ledger.hpp
|
||||
ledger.cpp
|
||||
network_filter.hpp
|
||||
network_filter.cpp
|
||||
utility.hpp
|
||||
utility.cpp
|
||||
versioning.hpp
|
||||
|
|
79
nano/secure/network_filter.cpp
Normal file
79
nano/secure/network_filter.cpp
Normal file
|
@ -0,0 +1,79 @@
|
|||
#include <nano/crypto_lib/random_pool.hpp>
|
||||
#include <nano/lib/locks.hpp>
|
||||
#include <nano/secure/buffer.hpp>
|
||||
#include <nano/secure/common.hpp>
|
||||
#include <nano/secure/network_filter.hpp>
|
||||
|
||||
nano::network_filter::network_filter (size_t size_a) :
|
||||
items (size_a, nano::uint128_t{ 0 })
|
||||
{
|
||||
nano::random_pool::generate_block (key, key.size ());
|
||||
}
|
||||
|
||||
bool nano::network_filter::apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_a)
|
||||
{
|
||||
// Get hash before locking
|
||||
auto digest (hash (bytes_a, count_a));
|
||||
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
auto & element (get_element (digest));
|
||||
bool existed (element == digest);
|
||||
if (!existed)
|
||||
{
|
||||
// Replace likely old element with a new one
|
||||
element = digest;
|
||||
}
|
||||
if (digest_a)
|
||||
{
|
||||
*digest_a = digest;
|
||||
}
|
||||
return existed;
|
||||
}
|
||||
|
||||
void nano::network_filter::clear (nano::uint128_t const & digest_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
auto & element (get_element (digest_a));
|
||||
if (element == digest_a)
|
||||
{
|
||||
element = nano::uint128_t{ 0 };
|
||||
}
|
||||
}
|
||||
|
||||
void nano::network_filter::clear (uint8_t const * bytes_a, size_t count_a)
|
||||
{
|
||||
clear (hash (bytes_a, count_a));
|
||||
}
|
||||
|
||||
template <typename OBJECT>
|
||||
void nano::network_filter::clear (OBJECT const & object_a)
|
||||
{
|
||||
std::vector<uint8_t> bytes;
|
||||
{
|
||||
nano::vectorstream stream (bytes);
|
||||
object_a->serialize (stream);
|
||||
}
|
||||
clear (bytes.data (), bytes.size ());
|
||||
}
|
||||
|
||||
void nano::network_filter::clear ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> lock (mutex);
|
||||
items.assign (items.size (), nano::uint128_t{ 0 });
|
||||
}
|
||||
|
||||
nano::uint128_t & nano::network_filter::get_element (nano::uint128_t const & hash_a)
|
||||
{
|
||||
assert (!mutex.try_lock ());
|
||||
assert (items.size () > 0);
|
||||
size_t index (hash_a % items.size ());
|
||||
return items[index];
|
||||
}
|
||||
|
||||
nano::uint128_t nano::network_filter::hash (uint8_t const * bytes_a, size_t count_a) const
|
||||
{
|
||||
nano::uint128_union digest{ 0 };
|
||||
siphash_t siphash (key, static_cast<unsigned int> (key.size ()));
|
||||
siphash.CalculateDigest (digest.bytes.data (), bytes_a, count_a);
|
||||
return digest.number ();
|
||||
}
|
74
nano/secure/network_filter.hpp
Normal file
74
nano/secure/network_filter.hpp
Normal file
|
@ -0,0 +1,74 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <nano/lib/numbers.hpp>
|
||||
|
||||
#include <crypto/cryptopp/seckey.h>
|
||||
#include <crypto/cryptopp/siphash.h>
|
||||
|
||||
#include <mutex>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
/**
|
||||
* A probabilistic duplicate filter based on directed map caches, using SipHash 2/4/128
|
||||
* The probability of false negatives (unique packet marked as duplicate) is the probability of a 128-bit SipHash collision.
|
||||
* The probability of false positives (duplicate packet marked as unique) shrinks with a larger filter.
|
||||
* @note This class is thread-safe.
|
||||
*/
|
||||
class network_filter final
|
||||
{
|
||||
public:
|
||||
network_filter () = delete;
|
||||
network_filter (size_t size_a);
|
||||
/**
|
||||
* Reads \p count_a bytes starting from \p bytes_a and inserts the siphash digest in the filter.
|
||||
* @param \p digest_a if given, will be set to the resulting siphash digest
|
||||
* @warning will read out of bounds if [ \p bytes_a, \p bytes_a + \p count_a ] is not a valid range
|
||||
* @return a boolean representing the previous existence of the hash in the filter.
|
||||
**/
|
||||
bool apply (uint8_t const * bytes_a, size_t count_a, nano::uint128_t * digest_a = nullptr);
|
||||
|
||||
/**
|
||||
* Sets the corresponding element in the filter to zero, if it matches \p digest_a exactly.
|
||||
**/
|
||||
void clear (nano::uint128_t const & digest_a);
|
||||
|
||||
/**
|
||||
* Reads \p count_a bytes starting from \p bytes_a and digests the contents.
|
||||
* Then, sets the corresponding element in the filter to zero, if it matches the digest exactly.
|
||||
* @warning will read out of bounds if [ \p bytes_a, \p bytes_a + \p count_a ] is not a valid range
|
||||
**/
|
||||
void clear (uint8_t const * bytes_a, size_t count_a);
|
||||
|
||||
/**
|
||||
* Serializes \p object_a and runs clears the resulting siphash digest.
|
||||
* @return a boolean representing the previous existence of the hash in the filter.
|
||||
**/
|
||||
template <typename OBJECT>
|
||||
void clear (OBJECT const & object_a);
|
||||
|
||||
/** Sets every element of the filter to zero, keeping its size and capacity. */
|
||||
void clear ();
|
||||
|
||||
private:
|
||||
using siphash_t = CryptoPP::SipHash<2, 4, true>;
|
||||
|
||||
/**
|
||||
* Get element from digest.
|
||||
* @note must have a lock on mutex
|
||||
* @return a reference to the element with key \p hash_a
|
||||
**/
|
||||
nano::uint128_t & get_element (nano::uint128_t const & hash_a);
|
||||
|
||||
/**
|
||||
* Hashes \p count_a bytes starting from \p bytes_a .
|
||||
* @return the siphash digest of the contents in \p bytes_a .
|
||||
**/
|
||||
nano::uint128_t hash (uint8_t const * bytes_a, size_t count_a) const;
|
||||
|
||||
std::vector<nano::uint128_t> items;
|
||||
CryptoPP::SecByteBlock key{ siphash_t::KEYLENGTH };
|
||||
std::mutex mutex;
|
||||
};
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue