Network duplicate filter for publish messages (#2643)

* Filter duplicate publish messages before deserializing

When a message is unique, the digest is saved and passed around to network processing, which may drop it if the block processor is full.

Cleaning up a long unchecked block erases its digest from the publish filter.

The blocks_filter has been removed due to redundancy. The size of this filter is 256k, which uses about 4MB.

* Batch erase in unchecked_cleanup due to a potentially large list
This commit is contained in:
Guilherme Lawless 2020-03-11 10:40:32 +00:00 committed by GitHub
commit 7f72882422
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 296 additions and 98 deletions

View file

@ -772,3 +772,54 @@ TEST (active_transactions, activate_dependencies)
ASSERT_TRUE (node1->ledger.block_confirmed (node1->store.tx_begin_read (), block2->hash ()));
ASSERT_TRUE (node2->ledger.block_confirmed (node2->store.tx_begin_read (), block2->hash ()));
}
namespace nano
{
// Tests that blocks are correctly cleared from the duplicate filter for unconfirmed elections
TEST (active_transactions, dropped_cleanup)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
auto block = genesis.open;
// Add to network filter to ensure proper cleanup after the election is dropped
std::vector<uint8_t> block_bytes;
{
nano::vectorstream stream (block_bytes);
block->serialize (stream);
}
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
auto election (node.active.insert (block).first);
ASSERT_NE (nullptr, election);
// Not yet removed
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
// Now simulate dropping the election, which performs a cleanup in the background using the node worker
ASSERT_FALSE (election->confirmed ());
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
election->cleanup ();
}
// Push a worker task to ensure the cleanup is already performed
std::atomic<bool> flag{ false };
node.worker.push_task ([&flag]() {
flag = true;
});
system.deadline_set (5s);
while (!flag)
{
ASSERT_NO_ERROR (system.poll ());
}
// The filter must have been cleared
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
}
}

View file

@ -63,9 +63,10 @@ TEST (message_parser, exact_confirm_ack_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
auto vote (std::make_shared<nano::vote> (0, nano::keypair ().prv, 0, std::move (block)));
nano::confirm_ack message (vote);
@ -96,9 +97,10 @@ TEST (message_parser, exact_confirm_req_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::confirm_req message (std::move (block));
std::vector<uint8_t> bytes;
@ -128,9 +130,10 @@ TEST (message_parser, exact_confirm_req_hash_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::send_block block (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1)));
nano::confirm_req message (block.hash (), block.root ());
std::vector<uint8_t> bytes;
@ -160,9 +163,10 @@ TEST (message_parser, exact_publish_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::publish message (std::move (block));
std::vector<uint8_t> bytes;
@ -192,9 +196,10 @@ TEST (message_parser, exact_keepalive_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::keepalive message;
std::vector<uint8_t> bytes;
{

View file

@ -946,6 +946,67 @@ TEST (network, peer_max_tcp_attempts)
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (node->network.endpoint ().address (), nano::get_available_port ())));
}
TEST (network, duplicate_detection)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_udp = false;
auto & node0 (*system.add_node (node_flags));
auto & node1 (*system.add_node (node_flags));
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node0.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
nano::genesis genesis;
nano::publish publish (genesis.open);
// Publish duplicate detection through UDP
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
udp_channel->send (publish);
udp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
// Publish duplicate detection through TCP
auto tcp_channel (node0.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node1.network.endpoint ())));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
tcp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (network, duplicate_revert_publish)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.block_processor.full ());
nano::genesis genesis;
nano::publish publish (genesis.open);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
publish.block->serialize (stream);
}
// Add to the blocks filter
// Should be cleared when dropping due to a full block processor, as long as the message has the optional digest attached
// Test network.duplicate_detection ensures that the digest is attached when deserializing messages
nano::uint128_t digest;
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
ASSERT_EQ (0, publish.digest);
node.network.process_message (publish, channel);
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
publish.digest = digest;
node.network.process_message (publish, channel);
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
}
// The test must be completed in less than 1 second
TEST (bandwidth_limiter, validate)
{

View file

@ -3217,7 +3217,7 @@ TEST (node, block_processor_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 2;
node_flags.block_processor_full_size = 3;
auto & node = *system.add_node (nano::node_config (nano::get_available_port (), system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
@ -3245,7 +3245,7 @@ TEST (node, block_processor_half_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 4;
node_flags.block_processor_full_size = 6;
auto & node = *system.add_node (nano::node_config (nano::get_available_port (), system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
@ -3421,6 +3421,14 @@ TEST (node, unchecked_cleanup)
nano::keypair key;
auto & node (*system.nodes[0]);
auto open (std::make_shared<nano::state_block> (key.pub, 0, key.pub, 1, key.pub, key.prv, key.pub, *system.work.generate (key.pub)));
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
open->serialize (stream);
}
// Add to the blocks filter
// Should be cleared after unchecked cleanup
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
node.process_active (open);
node.block_processor.flush ();
node.config.unchecked_cutoff_time = std::chrono::seconds (2);
@ -3432,6 +3440,7 @@ TEST (node, unchecked_cleanup)
}
std::this_thread::sleep_for (std::chrono::seconds (1));
node.unchecked_cleanup ();
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
{
auto transaction (node.store.tx_begin_read ());
auto unchecked_count (node.store.unchecked_count (transaction));
@ -3440,6 +3449,7 @@ TEST (node, unchecked_cleanup)
}
std::this_thread::sleep_for (std::chrono::seconds (2));
node.unchecked_cleanup ();
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
{
auto transaction (node.store.tx_begin_read ());
auto unchecked_count (node.store.unchecked_count (transaction));

View file

@ -430,8 +430,11 @@ bool nano::validate_message (nano::public_key const & public_key, nano::uint256_
bool nano::validate_message_batch (const unsigned char ** m, size_t * mlen, const unsigned char ** pk, const unsigned char ** RS, size_t num, int * valid)
{
bool result (0 == ed25519_sign_open_batch (m, mlen, pk, RS, num, valid));
return result;
for (size_t i{ 0 }; i < num; ++i)
{
valid[i] = (0 == ed25519_sign_open (m[i], mlen[i], pk[i], RS[i]));
}
return true;
}
nano::uint128_union::uint128_union (std::string const & string_a)

View file

@ -444,6 +444,9 @@ std::string nano::stat::type_to_string (uint32_t key)
case nano::stat::type::requests:
res = "requests";
break;
case nano::stat::type::filter:
res = "filter";
break;
}
return res;
}
@ -694,6 +697,9 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::requests_unknown:
res = "requests_unknown";
break;
case nano::stat::detail::duplicate_publish:
res = "duplicate_publish";
break;
}
return res;
}

View file

@ -199,7 +199,8 @@ public:
confirmation_height,
drop,
aggregator,
requests
requests,
filter,
};
/** Optional detail type */
@ -314,7 +315,10 @@ public:
requests_generated_hashes,
requests_cached_votes,
requests_generated_votes,
requests_unknown
requests_unknown,
// duplicate
duplicate_publish
};
/** Direction of the stat. If the direction is irrelevant, use in */

View file

@ -251,7 +251,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
bool const overflow_l (unconfirmed_count_l > node.config.active_elections_size && election_l->election_start < election_ttl_cutoff_l && !node.wallets.watcher->is_watched (i->root));
if (overflow_l || election_l->transition_time (solicitor))
{
election_l->clear_blocks ();
election_l->cleanup ();
i = sorted_roots_l.erase (i);
}
else
@ -803,7 +803,7 @@ void nano::active_transactions::erase (nano::block const & block_a)
auto root_it (roots.get<tag_root> ().find (block_a.qualified_root ()));
if (root_it != roots.get<tag_root> ().end ())
{
root_it->election->clear_blocks ();
root_it->election->cleanup ();
root_it->election->adjust_dependent_difficulty ();
roots.get<tag_root> ().erase (root_it);
node.logger.try_log (boost::str (boost::format ("Election erased for block block %1% root %2%") % block_a.hash ().to_string () % block_a.root ().to_string ()));
@ -949,18 +949,13 @@ nano::inactive_cache_information nano::active_transactions::find_inactive_votes_
}
else
{
return nano::inactive_cache_information{ std::chrono::steady_clock::time_point{}, 0, std::vector<nano::account>{} };
return nano::inactive_cache_information{};
}
}
void nano::active_transactions::erase_inactive_votes_cache (nano::block_hash const & hash_a)
{
auto & inactive_by_hash (inactive_votes_cache.get<tag_hash> ());
auto existing (inactive_by_hash.find (hash_a));
if (existing != inactive_by_hash.end ())
{
inactive_by_hash.erase (existing);
}
inactive_votes_cache.get<tag_hash> ().erase (hash_a);
}
bool nano::active_transactions::inactive_votes_bootstrap_check (std::vector<nano::account> const & voters_a, nano::block_hash const & hash_a, bool & confirmed_a)

View file

@ -199,6 +199,7 @@ private:
bool inactive_votes_bootstrap_check (std::vector<nano::account> const &, nano::block_hash const &, bool &);
boost::thread thread;
friend class active_transactions_dropped_cleanup_Test;
friend class confirmation_height_prioritize_frontiers_Test;
friend class confirmation_height_prioritize_frontiers_overwrite_Test;
friend std::unique_ptr<container_info_component> collect_container_info (active_transactions &, const std::string &);

View file

@ -42,7 +42,6 @@ void nano::block_processor::flush ()
{
condition.wait (lock);
}
blocks_filter.clear ();
flushing = false;
}
@ -54,12 +53,12 @@ size_t nano::block_processor::size ()
bool nano::block_processor::full ()
{
return size () > node.flags.block_processor_full_size;
return size () >= node.flags.block_processor_full_size;
}
bool nano::block_processor::half_full ()
{
return size () > node.flags.block_processor_full_size / 2;
return size () >= node.flags.block_processor_full_size / 2;
}
void nano::block_processor::add (std::shared_ptr<nano::block> block_a, uint64_t origination)
@ -72,20 +71,14 @@ void nano::block_processor::add (nano::unchecked_info const & info_a)
{
debug_assert (!nano::work_validate (*info_a.block));
{
auto hash (info_a.block->hash ());
auto filter_hash (filter_item (hash, info_a.block->block_signature ()));
nano::lock_guard<std::mutex> lock (mutex);
if (blocks_filter.find (filter_hash) == blocks_filter.end ())
if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ()))
{
if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ()))
{
state_blocks.push_back (info_a);
}
else
{
blocks.push_back (info_a);
}
blocks_filter.insert (filter_hash);
state_blocks.push_back (info_a);
}
else
{
blocks.push_back (info_a);
}
}
condition.notify_all ();
@ -233,7 +226,6 @@ void nano::block_processor::verify_state_blocks (nano::unique_lock<std::mutex> &
}
else
{
blocks_filter.erase (filter_item (hashes[i], blocks_signatures[i]));
requeue_invalid (hashes[i], item);
}
items.pop_front ();
@ -288,7 +280,6 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
info = blocks.front ();
blocks.pop_front ();
hash = info.block->hash ();
blocks_filter.erase (filter_item (hash, info.block->block_signature ()));
}
else
{
@ -551,19 +542,6 @@ void nano::block_processor::queue_unchecked (nano::write_transaction const & tra
node.gap_cache.erase (hash_a);
}
nano::block_hash nano::block_processor::filter_item (nano::block_hash const & hash_a, nano::signature const & signature_a)
{
static nano::random_constants constants;
nano::block_hash result;
blake2b_state state;
blake2b_init (&state, sizeof (result.bytes));
blake2b_update (&state, constants.not_an_account.bytes.data (), constants.not_an_account.bytes.size ());
blake2b_update (&state, signature_a.bytes.data (), signature_a.bytes.size ());
blake2b_update (&state, hash_a.bytes.data (), hash_a.bytes.size ());
blake2b_final (&state, result.bytes.data (), sizeof (result.bytes));
return result;
}
void nano::block_processor::requeue_invalid (nano::block_hash const & hash_a, nano::unchecked_info const & info_a)
{
debug_assert (hash_a == info_a.block->hash ());

View file

@ -62,8 +62,6 @@ private:
std::deque<nano::unchecked_info> state_blocks;
std::deque<nano::unchecked_info> blocks;
std::deque<std::shared_ptr<nano::block>> forced;
nano::block_hash filter_item (nano::block_hash const &, nano::signature const &);
std::unordered_set<nano::block_hash> blocks_filter;
nano::condition_variable condition;
nano::node & node;
nano::write_database_queue & write_database_queue;

View file

@ -408,15 +408,24 @@ void nano::bootstrap_server::receive_publish_action (boost::system::error_code c
{
if (!ec)
{
auto error (false);
nano::bufferstream stream (receive_buffer->data (), size_a);
auto request (std::make_unique<nano::publish> (error, stream, header_a));
if (!error)
nano::uint128_t digest;
if (!node->network.publish_filter.apply (receive_buffer->data (), size_a, &digest))
{
if (is_realtime_connection ())
auto error (false);
nano::bufferstream stream (receive_buffer->data (), size_a);
auto request (std::make_unique<nano::publish> (error, stream, header_a, digest));
if (!error)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
if (is_realtime_connection ())
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
receive ();
}
}
else
{
node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish);
receive ();
}
}

View file

@ -312,6 +312,10 @@ std::string nano::message_parser::status_string ()
{
return "invalid_network";
}
case nano::message_parser::parse_status::duplicate_publish_message:
{
return "duplicate_publish_message";
}
}
debug_assert (false);
@ -319,7 +323,8 @@ std::string nano::message_parser::status_string ()
return "[unknown parse_status]";
}
nano::message_parser::message_parser (nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a, nano::message_visitor & visitor_a, nano::work_pool & pool_a) :
nano::message_parser::message_parser (nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a, nano::message_visitor & visitor_a, nano::work_pool & pool_a) :
publish_filter (publish_filter_a),
block_uniquer (block_uniquer_a),
vote_uniquer (vote_uniquer_a),
visitor (visitor_a),
@ -355,7 +360,15 @@ void nano::message_parser::deserialize_buffer (uint8_t const * buffer_a, size_t
}
case nano::message_type::publish:
{
deserialize_publish (stream, header);
nano::uint128_t digest;
if (!publish_filter.apply (buffer_a + header.size, size_a - header.size, &digest))
{
deserialize_publish (stream, header, digest);
}
else
{
status = parse_status::duplicate_publish_message;
}
break;
}
case nano::message_type::confirm_req:
@ -412,10 +425,10 @@ void nano::message_parser::deserialize_keepalive (nano::stream & stream_a, nano:
}
}
void nano::message_parser::deserialize_publish (nano::stream & stream_a, nano::message_header const & header_a)
void nano::message_parser::deserialize_publish (nano::stream & stream_a, nano::message_header const & header_a, nano::uint128_t const & digest_a)
{
auto error (false);
nano::publish incoming (error, stream_a, header_a, &block_uniquer);
nano::publish incoming (error, stream_a, header_a, digest_a, &block_uniquer);
if (!error && at_end (stream_a))
{
if (!nano::work_validate (*incoming.block))
@ -593,8 +606,9 @@ bool nano::keepalive::operator== (nano::keepalive const & other_a) const
return peers == other_a.peers;
}
nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::block_uniquer * uniquer_a) :
message (header_a)
nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::uint128_t const & digest_a, nano::block_uniquer * uniquer_a) :
message (header_a),
digest (digest_a)
{
if (!error_a)
{

View file

@ -7,6 +7,7 @@
#include <nano/lib/jsonconfig.hpp>
#include <nano/lib/memory.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/network_filter.hpp>
#include <bitset>
@ -247,18 +248,20 @@ public:
invalid_telemetry_ack_message,
outdated_version,
invalid_magic,
invalid_network
invalid_network,
duplicate_publish_message
};
message_parser (nano::block_uniquer &, nano::vote_uniquer &, nano::message_visitor &, nano::work_pool &);
message_parser (nano::network_filter &, nano::block_uniquer &, nano::vote_uniquer &, nano::message_visitor &, nano::work_pool &);
void deserialize_buffer (uint8_t const *, size_t);
void deserialize_keepalive (nano::stream &, nano::message_header const &);
void deserialize_publish (nano::stream &, nano::message_header const &);
void deserialize_publish (nano::stream &, nano::message_header const &, nano::uint128_t const & = 0);
void deserialize_confirm_req (nano::stream &, nano::message_header const &);
void deserialize_confirm_ack (nano::stream &, nano::message_header const &);
void deserialize_node_id_handshake (nano::stream &, nano::message_header const &);
void deserialize_telemetry_req (nano::stream &, nano::message_header const &);
void deserialize_telemetry_ack (nano::stream &, nano::message_header const &);
bool at_end (nano::stream &);
nano::network_filter & publish_filter;
nano::block_uniquer & block_uniquer;
nano::vote_uniquer & vote_uniquer;
nano::message_visitor & visitor;
@ -282,13 +285,14 @@ public:
class publish final : public message
{
public:
publish (bool &, nano::stream &, nano::message_header const &, nano::block_uniquer * = nullptr);
publish (bool &, nano::stream &, nano::message_header const &, nano::uint128_t const & = 0, nano::block_uniquer * = nullptr);
explicit publish (std::shared_ptr<nano::block>);
void visit (nano::message_visitor &) const override;
void serialize (nano::stream &) const override;
bool deserialize (nano::stream &, nano::block_uniquer * = nullptr);
bool operator== (nano::publish const &) const;
std::shared_ptr<nano::block> block;
nano::uint128_t digest{ 0 };
};
class confirm_req final : public message
{

View file

@ -1,5 +1,6 @@
#include <nano/node/confirmation_solicitor.hpp>
#include <nano/node/election.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
#include <boost/format.hpp>
@ -523,8 +524,9 @@ void nano::election::adjust_dependent_difficulty ()
}
}
void nano::election::clear_blocks ()
void nano::election::cleanup ()
{
bool unconfirmed (!confirmed ());
auto winner_hash (status.winner->hash ());
for (auto const & block : blocks)
{
@ -534,11 +536,21 @@ void nano::election::clear_blocks ()
debug_assert (erased == 1);
node.active.erase_inactive_votes_cache (hash);
// Notify observers about dropped elections & blocks lost confirmed elections
if (!confirmed () || hash != winner_hash)
if (unconfirmed || hash != winner_hash)
{
node.observers.active_stopped.notify (hash);
}
}
if (unconfirmed)
{
// Clear network filter in another thread
node.worker.push_task ([node_l = node.shared (), blocks_l = std::move (blocks)]() {
for (auto const & block : blocks_l)
{
node_l->network.publish_filter.clear (block.second);
}
});
}
}
void nano::election::insert_inactive_votes_cache (nano::block_hash const & hash_a)

View file

@ -77,8 +77,9 @@ public:
size_t last_votes_size ();
void update_dependent ();
void adjust_dependent_difficulty ();
void clear_blocks ();
void insert_inactive_votes_cache (nano::block_hash const &);
// Erase all blocks from active and, if not confirmed, clear digests from network filters
void cleanup ();
public: // State transitions
bool transition_time (nano::confirmation_solicitor &);

View file

@ -16,6 +16,7 @@ buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receiv
resolver (node_a.io_ctx),
limiter (node_a.config.bandwidth_limit),
node (node_a),
publish_filter (256 * 1024),
udp_channels (node_a, port_a),
tcp_channels (node_a),
port (port_a),
@ -391,6 +392,7 @@ public:
}
else
{
node.network.publish_filter.clear (message_a.digest);
node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in);
}
}
@ -428,22 +430,25 @@ public:
node.logger.try_log (boost::str (boost::format ("Received confirm_ack message from %1% for %2%sequence %3%") % channel->to_string () % message_a.vote->hashes_string () % std::to_string (message_a.vote->sequence)));
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in);
for (auto & vote_block : message_a.vote->blocks)
if (!message_a.vote->account.is_zero ())
{
if (!vote_block.which ())
for (auto & vote_block : message_a.vote->blocks)
{
auto block (boost::get<std::shared_ptr<nano::block>> (vote_block));
if (!node.block_processor.full ())
if (!vote_block.which ())
{
node.process_active (block);
}
else
{
node.stats.inc (nano::stat::type::drop, nano::stat::detail::confirm_ack, nano::stat::dir::in);
auto block (boost::get<std::shared_ptr<nano::block>> (vote_block));
if (!node.block_processor.full ())
{
node.process_active (block);
}
else
{
node.stats.inc (nano::stat::type::drop, nano::stat::detail::confirm_ack, nano::stat::dir::in);
}
}
}
node.vote_processor.vote (message_a.vote, channel);
}
node.vote_processor.vote (message_a.vote, channel);
}
void bulk_pull (nano::bulk_pull const &) override
{

View file

@ -3,13 +3,13 @@
#include <nano/node/common.hpp>
#include <nano/node/transport/tcp.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/secure/network_filter.hpp>
#include <boost/thread/thread.hpp>
#include <memory>
#include <queue>
#include <unordered_set>
namespace nano
{
class channel;
@ -154,6 +154,7 @@ public:
std::vector<boost::thread> packet_processing_threads;
nano::bandwidth_limiter limiter;
nano::node & node;
nano::network_filter publish_filter;
nano::transport::udp_channels udp_channels;
nano::transport::tcp_channels tcp_channels;
std::atomic<uint16_t> port{ 0 };

View file

@ -86,21 +86,18 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (bl
{
size_t state_blocks_count;
size_t blocks_count;
size_t blocks_filter_count;
size_t forced_count;
{
nano::lock_guard<std::mutex> guard (block_processor.mutex);
state_blocks_count = block_processor.state_blocks.size ();
blocks_count = block_processor.blocks.size ();
blocks_filter_count = block_processor.blocks_filter.size ();
forced_count = block_processor.forced.size ();
}
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "state_blocks", state_blocks_count, sizeof (decltype (block_processor.state_blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks_filter", blocks_filter_count, sizeof (decltype (block_processor.blocks_filter)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) }));
composite->add_component (collect_container_info (block_processor.generator, "generator"));
return composite;
@ -938,6 +935,7 @@ void nano::node::bootstrap_wallet ()
void nano::node::unchecked_cleanup ()
{
std::vector<nano::uint128_t> digests;
std::deque<nano::unchecked_key> cleaning_list;
auto attempt (bootstrap_initiator.current_attempt ());
bool long_attempt (attempt != nullptr && std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - attempt->attempt_start).count () > config.unchecked_cutoff_time.count ());
@ -953,6 +951,7 @@ void nano::node::unchecked_cleanup ()
nano::unchecked_info const & info (i->second);
if ((now - info.modified) > static_cast<uint64_t> (config.unchecked_cutoff_time.count ()))
{
digests.push_back (network.publish_filter.hash (info.block));
cleaning_list.push_back (key);
}
}
@ -978,6 +977,8 @@ void nano::node::unchecked_cleanup ()
}
}
}
// Delete from the duplicate filter
network.publish_filter.clear (digests);
}
void nano::node::ongoing_unchecked_cleanup ()

View file

@ -544,9 +544,17 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_
if (allowed_sender)
{
udp_message_visitor visitor (node, data_a->endpoint);
nano::message_parser parser (node.block_uniquer, node.vote_uniquer, visitor, node.work);
nano::message_parser parser (node.network.publish_filter, node.block_uniquer, node.vote_uniquer, visitor, node.work);
parser.deserialize_buffer (data_a->buffer, data_a->size);
if (parser.status != nano::message_parser::parse_status::success)
if (parser.status == nano::message_parser::parse_status::success)
{
node.stats.add (nano::stat::type::traffic_udp, nano::stat::dir::in, data_a->size);
}
else if (parser.status == nano::message_parser::parse_status::duplicate_publish_message)
{
node.stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish);
}
else
{
node.stats.inc (nano::stat::type::error);
@ -592,15 +600,12 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_
case nano::message_parser::parse_status::outdated_version:
node.stats.inc (nano::stat::type::udp, nano::stat::detail::outdated_version);
break;
case nano::message_parser::parse_status::duplicate_publish_message:
case nano::message_parser::parse_status::success:
/* Already checked, unreachable */
break;
}
}
else
{
node.stats.add (nano::stat::type::traffic_udp, nano::stat::dir::in, data_a->size);
}
}
else
{

View file

@ -40,6 +40,19 @@ void nano::network_filter::clear (nano::uint128_t const & digest_a)
}
}
void nano::network_filter::clear (std::vector<nano::uint128_t> const & digests_a)
{
nano::lock_guard<std::mutex> lock (mutex);
for (auto const & digest : digests_a)
{
auto & element (get_element (digest));
if (element == digest)
{
element = nano::uint128_t{ 0 };
}
}
}
void nano::network_filter::clear (uint8_t const * bytes_a, size_t count_a)
{
clear (hash (bytes_a, count_a));
@ -48,12 +61,7 @@ void nano::network_filter::clear (uint8_t const * bytes_a, size_t count_a)
template <typename OBJECT>
void nano::network_filter::clear (OBJECT const & object_a)
{
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
object_a->serialize (stream);
}
clear (bytes.data (), bytes.size ());
clear (hash (object_a));
}
void nano::network_filter::clear ()
@ -62,6 +70,17 @@ void nano::network_filter::clear ()
items.assign (items.size (), nano::uint128_t{ 0 });
}
template <typename OBJECT>
nano::uint128_t nano::network_filter::hash (OBJECT const & object_a) const
{
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
object_a->serialize (stream);
}
return hash (bytes.data (), bytes.size ());
}
nano::uint128_t & nano::network_filter::get_element (nano::uint128_t const & hash_a)
{
debug_assert (!mutex.try_lock ());
@ -77,3 +96,7 @@ nano::uint128_t nano::network_filter::hash (uint8_t const * bytes_a, size_t coun
siphash.CalculateDigest (digest.bytes.data (), bytes_a, count_a);
return digest.number ();
}
// Explicitly instantiate
template nano::uint128_t nano::network_filter::hash (std::shared_ptr<nano::block> const &) const;
template void nano::network_filter::clear (std::shared_ptr<nano::block> const &);

View file

@ -34,6 +34,11 @@ public:
**/
void clear (nano::uint128_t const & digest_a);
/**
* Clear many digests from the filter
**/
void clear (std::vector<nano::uint128_t> const &);
/**
* Reads \p count_a bytes starting from \p bytes_a and digests the contents.
* Then, sets the corresponding element in the filter to zero, if it matches the digest exactly.
@ -42,7 +47,7 @@ public:
void clear (uint8_t const * bytes_a, size_t count_a);
/**
* Serializes \p object_a and runs clears the resulting siphash digest.
* Serializes \p object_a and clears the resulting siphash digest from the filter.
* @return a boolean representing the previous existence of the hash in the filter.
**/
template <typename OBJECT>
@ -51,6 +56,12 @@ public:
/** Sets every element of the filter to zero, keeping its size and capacity. */
void clear ();
/**
* Serializes \p object_a and returns the resulting siphash digest
*/
template <typename OBJECT>
nano::uint128_t hash (OBJECT const & object_a) const;
private:
using siphash_t = CryptoPP::SipHash<2, 4, true>;