From ff88902d190e4a256c0a1567b90c4bef313858e7 Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Thu, 27 Feb 2020 08:22:49 +0000 Subject: [PATCH] Aggressive flooding for local blocks (#2549) * Aggressive flooding for local blocks Floods locally produced blocks (going through the work watcher, or process_local - wallet, RPC process) such that they are sent to all PRs and random subset of other peers. This allows more protection against Sybil attacks, and more streamlined elections as there are less occasions of votes arriving before blocks. As a consequence, the average amount of echoes will increase by 1 for PRs. However, with this change in place and used by most of the network, a future enhancement would be to reduce the republishing fanout. This change is based on a proposal by @Srayman for more information see the original proposal at https://medium.com/nanocurrency/proposal-for-nano-node-network-optimizations-21003e79cdba * Add flag to disable republishing in block processor and a test for aggressive flooding * Adjust timings for sanitizer builds * Wrap in a lambda to simplify test, and adjust it --- nano/core_test/node.cpp | 78 ++++++++++++++++++++++++++++++++++++ nano/node/blockprocessor.cpp | 15 +++++-- nano/node/blockprocessor.hpp | 4 +- nano/node/network.cpp | 19 +++++++++ nano/node/network.hpp | 10 ++--- nano/node/node.cpp | 2 +- nano/node/nodeconfig.hpp | 1 + nano/node/wallet.cpp | 1 + 8 files changed, 117 insertions(+), 13 deletions(-) diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 9f5a53b4..60537aef 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3613,6 +3613,84 @@ TEST (node, bandwidth_limiter) node.stop (); } +// Tests that local blocks are flooded to all principal representatives +TEST (node, aggressive_flooding) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.disable_request_loop = true; + node_flags.disable_block_processor_republishing = true; + auto & node1 (*system.add_node (node_flags)); + auto & wallet1 (*system.wallet (0)); + wallet1.insert_adhoc (nano::test_genesis_key.prv); + std::array, std::shared_ptr>, 5> nodes_wallets{}; + std::generate (nodes_wallets.begin (), nodes_wallets.end (), [&system, node_flags]() { + nano::node_config node_config; + node_config.peering_port = nano::get_available_port (); + auto node (system.add_node (node_config, node_flags)); + return std::make_pair (node, system.wallet (system.nodes.size () - 1)); + }); + auto large_amount = (nano::genesis_amount / 2) / nodes_wallets.size (); + for (auto & node_wallet : nodes_wallets) + { + nano::keypair keypair; + node_wallet.second->store.representative_set (node_wallet.first->wallets.tx_begin_write (), keypair.pub); + node_wallet.second->insert_adhoc (keypair.prv); + wallet1.send_action (nano::test_genesis_key.pub, keypair.pub, large_amount); + } + // Wait until all nodes have a representative + system.deadline_set (!is_sanitizer_build ? 5s : 15s); + while (node1.rep_crawler.principal_representatives ().size () != nodes_wallets.size ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + // Generate blocks and ensure they are sent to all representatives + nano::block_builder builder; + std::shared_ptr block{}; + { + auto transaction (node1.store.tx_begin_read ()); + block = builder.state () + .account (nano::test_genesis_key.pub) + .representative (nano::test_genesis_key.pub) + .previous (node1.ledger.latest (transaction, nano::test_genesis_key.pub)) + .balance (node1.ledger.account_balance (transaction, nano::test_genesis_key.pub) - 1) + .link (nano::test_genesis_key.pub) + .sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub) + .work (*node1.work_generate_blocking (node1.ledger.latest (transaction, nano::test_genesis_key.pub))) + .build (); + } + // Processing locally goes through the aggressive block flooding path + node1.process_local (block, false); + + auto all_have_block = [&nodes_wallets](nano::block_hash const & hash_a) { + return std::all_of (nodes_wallets.begin (), nodes_wallets.end (), [hash = hash_a](auto const & node_wallet) { + return node_wallet.first->block (hash) != nullptr; + }); + }; + + system.deadline_set (!is_sanitizer_build ? 3s : 10s); + while (!all_have_block (block->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Do the same for a wallet block + auto wallet_block = wallet1.send_sync (nano::test_genesis_key.pub, nano::test_genesis_key.pub, 10); + system.deadline_set (!is_sanitizer_build ? 3s : 10s); + while (!all_have_block (wallet_block)) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Wait until the main node has all blocks: genesis + (send+open) for each representative + 2 local blocks + // The main node only sees all blocks if other nodes are flooding their PR's open block to all other PRs + system.deadline_set (5s); + while (node1.ledger.cache.block_count < 1 + 2 * nodes_wallets.size () + 2) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + TEST (active_difficulty, recalculate_work) { nano::system system; diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index d7ec060f..8227a394 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -361,7 +361,7 @@ void nano::block_processor::process_batch (nano::unique_lock & lock_ } } -void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr block_a, const bool watch_work_a) +void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr block_a, const bool watch_work_a, const bool initial_publish_a) { // Add to work watcher to prevent dropping the election if (watch_work_a) @@ -373,7 +373,14 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std:: node.active.insert (block_a, false); // Announce block contents to the network - node.network.flood_block (block_a, nano::buffer_drop_policy::no_limiter_drop); + if (initial_publish_a) + { + node.network.flood_block_initial (block_a); + } + else if (!node.flags.disable_block_processor_republishing) + { + node.network.flood_block (block_a, nano::buffer_drop_policy::no_limiter_drop); + } if (node.config.enable_voting && node.wallets.rep_counts ().voting > 0) { // Announce our weighted vote to the network @@ -381,7 +388,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std:: } } -nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a) +nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a, const bool first_publish_a) { nano::process_return result; auto hash (info_a.block->hash ()); @@ -399,7 +406,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction } if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash)) { - process_live (hash, info_a.block, watch_work_a); + process_live (hash, info_a.block, watch_work_a, first_publish_a); } queue_unchecked (transaction_a, hash); break; diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 15614d42..9563a213 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -42,7 +42,7 @@ public: bool should_log (bool); bool have_blocks (); void process_blocks (); - nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false); + nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false, const bool = false); nano::process_return process_one (nano::write_transaction const &, std::shared_ptr, const bool = false); nano::vote_generator generator; // Delay required for average network propagartion before requesting confirmation @@ -52,7 +52,7 @@ private: void queue_unchecked (nano::write_transaction const &, nano::block_hash const &); void verify_state_blocks (nano::unique_lock &, size_t = std::numeric_limits::max ()); void process_batch (nano::unique_lock &); - void process_live (nano::block_hash const &, std::shared_ptr, const bool = false); + void process_live (nano::block_hash const &, std::shared_ptr, const bool = false, const bool = false); void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &); bool stopped; bool active; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 3483bd47..26a4c98b 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -154,6 +154,25 @@ void nano::network::flood_message (nano::message const & message_a, nano::buffer } } +void nano::network::flood_block (std::shared_ptr const & block_a, nano::buffer_drop_policy const drop_policy_a) +{ + nano::publish message (block_a); + flood_message (message, drop_policy_a); +} + +void nano::network::flood_block_initial (std::shared_ptr const & block_a) +{ + nano::publish message (block_a); + for (auto const & i : node.rep_crawler.principal_representatives ()) + { + i.channel->send (message, nullptr, nano::buffer_drop_policy::no_limiter_drop); + } + for (auto & i : list_non_pr (fanout (1.0))) + { + i->send (message, nullptr, nano::buffer_drop_policy::no_limiter_drop); + } +} + void nano::network::flood_vote (std::shared_ptr const & vote_a, float scale) { nano::confirm_ack message (vote_a); diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 546c8fff..99c68bbc 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -107,12 +107,10 @@ public: } void flood_vote (std::shared_ptr const &, float scale); void flood_vote_pr (std::shared_ptr const &); - void flood_block (std::shared_ptr block_a, nano::buffer_drop_policy drop_policy_a = nano::buffer_drop_policy::limiter) - { - nano::publish publish (block_a); - flood_message (publish, drop_policy_a); - } - + // Flood block to all PRs and a random selection of non-PRs + void flood_block_initial (std::shared_ptr const &); + // Flood block to a random selection of peers + void flood_block (std::shared_ptr const &, nano::buffer_drop_policy const = nano::buffer_drop_policy::limiter); void flood_block_many (std::deque>, std::function = nullptr, unsigned = broadcast_interval_ms); void merge_peers (std::array const &); void merge_peer (nano::endpoint const &); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index e16822d6..9ac038f9 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -627,7 +627,7 @@ nano::process_return nano::node::process_local (std::shared_ptr blo block_processor.wait_write (); // Process block auto transaction (store.tx_begin_write ({ tables::accounts, tables::cached_counts, tables::change_blocks, tables::frontiers, tables::open_blocks, tables::pending, tables::receive_blocks, tables::representation, tables::send_blocks, tables::state_blocks }, { tables::confirmation_height })); - return block_processor.process_one (transaction, info, work_watcher_a); + return block_processor.process_one (transaction, info, work_watcher_a, true); } void nano::node::start () diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index f242cadd..f3600dea 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -126,6 +126,7 @@ public: bool disable_unchecked_drop{ true }; bool disable_providing_telemetry_metrics{ false }; bool disable_block_processor_unchecked_deletion{ false }; + bool disable_block_processor_republishing{ false }; bool disable_ongoing_telemetry_requests{ false }; bool fast_bootstrap{ false }; bool read_only{ false }; diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index 08e3a9dd..6541dcf2 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -1453,6 +1453,7 @@ void nano::work_watcher::watching (nano::qualified_root const & root_a, std::sha if (!ec) { + watcher_l->node.network.flood_block_initial (block); watcher_l->node.active.update_difficulty (block); watcher_l->update (root_a, block); updated_l = true;