diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 9f5a53b4..60537aef 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3613,6 +3613,84 @@ TEST (node, bandwidth_limiter) node.stop (); } +// Tests that local blocks are flooded to all principal representatives +TEST (node, aggressive_flooding) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.disable_request_loop = true; + node_flags.disable_block_processor_republishing = true; + auto & node1 (*system.add_node (node_flags)); + auto & wallet1 (*system.wallet (0)); + wallet1.insert_adhoc (nano::test_genesis_key.prv); + std::array, std::shared_ptr>, 5> nodes_wallets{}; + std::generate (nodes_wallets.begin (), nodes_wallets.end (), [&system, node_flags]() { + nano::node_config node_config; + node_config.peering_port = nano::get_available_port (); + auto node (system.add_node (node_config, node_flags)); + return std::make_pair (node, system.wallet (system.nodes.size () - 1)); + }); + auto large_amount = (nano::genesis_amount / 2) / nodes_wallets.size (); + for (auto & node_wallet : nodes_wallets) + { + nano::keypair keypair; + node_wallet.second->store.representative_set (node_wallet.first->wallets.tx_begin_write (), keypair.pub); + node_wallet.second->insert_adhoc (keypair.prv); + wallet1.send_action (nano::test_genesis_key.pub, keypair.pub, large_amount); + } + // Wait until all nodes have a representative + system.deadline_set (!is_sanitizer_build ? 5s : 15s); + while (node1.rep_crawler.principal_representatives ().size () != nodes_wallets.size ()) + { + ASSERT_NO_ERROR (system.poll ()); + } + // Generate blocks and ensure they are sent to all representatives + nano::block_builder builder; + std::shared_ptr block{}; + { + auto transaction (node1.store.tx_begin_read ()); + block = builder.state () + .account (nano::test_genesis_key.pub) + .representative (nano::test_genesis_key.pub) + .previous (node1.ledger.latest (transaction, nano::test_genesis_key.pub)) + .balance (node1.ledger.account_balance (transaction, nano::test_genesis_key.pub) - 1) + .link (nano::test_genesis_key.pub) + .sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub) + .work (*node1.work_generate_blocking (node1.ledger.latest (transaction, nano::test_genesis_key.pub))) + .build (); + } + // Processing locally goes through the aggressive block flooding path + node1.process_local (block, false); + + auto all_have_block = [&nodes_wallets](nano::block_hash const & hash_a) { + return std::all_of (nodes_wallets.begin (), nodes_wallets.end (), [hash = hash_a](auto const & node_wallet) { + return node_wallet.first->block (hash) != nullptr; + }); + }; + + system.deadline_set (!is_sanitizer_build ? 3s : 10s); + while (!all_have_block (block->hash ())) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Do the same for a wallet block + auto wallet_block = wallet1.send_sync (nano::test_genesis_key.pub, nano::test_genesis_key.pub, 10); + system.deadline_set (!is_sanitizer_build ? 3s : 10s); + while (!all_have_block (wallet_block)) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Wait until the main node has all blocks: genesis + (send+open) for each representative + 2 local blocks + // The main node only sees all blocks if other nodes are flooding their PR's open block to all other PRs + system.deadline_set (5s); + while (node1.ledger.cache.block_count < 1 + 2 * nodes_wallets.size () + 2) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + TEST (active_difficulty, recalculate_work) { nano::system system; diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index d7ec060f..8227a394 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -361,7 +361,7 @@ void nano::block_processor::process_batch (nano::unique_lock & lock_ } } -void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr block_a, const bool watch_work_a) +void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr block_a, const bool watch_work_a, const bool initial_publish_a) { // Add to work watcher to prevent dropping the election if (watch_work_a) @@ -373,7 +373,14 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std:: node.active.insert (block_a, false); // Announce block contents to the network - node.network.flood_block (block_a, nano::buffer_drop_policy::no_limiter_drop); + if (initial_publish_a) + { + node.network.flood_block_initial (block_a); + } + else if (!node.flags.disable_block_processor_republishing) + { + node.network.flood_block (block_a, nano::buffer_drop_policy::no_limiter_drop); + } if (node.config.enable_voting && node.wallets.rep_counts ().voting > 0) { // Announce our weighted vote to the network @@ -381,7 +388,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std:: } } -nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a) +nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a, const bool first_publish_a) { nano::process_return result; auto hash (info_a.block->hash ()); @@ -399,7 +406,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction } if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash)) { - process_live (hash, info_a.block, watch_work_a); + process_live (hash, info_a.block, watch_work_a, first_publish_a); } queue_unchecked (transaction_a, hash); break; diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 15614d42..9563a213 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -42,7 +42,7 @@ public: bool should_log (bool); bool have_blocks (); void process_blocks (); - nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false); + nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false, const bool = false); nano::process_return process_one (nano::write_transaction const &, std::shared_ptr, const bool = false); nano::vote_generator generator; // Delay required for average network propagartion before requesting confirmation @@ -52,7 +52,7 @@ private: void queue_unchecked (nano::write_transaction const &, nano::block_hash const &); void verify_state_blocks (nano::unique_lock &, size_t = std::numeric_limits::max ()); void process_batch (nano::unique_lock &); - void process_live (nano::block_hash const &, std::shared_ptr, const bool = false); + void process_live (nano::block_hash const &, std::shared_ptr, const bool = false, const bool = false); void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &); bool stopped; bool active; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 3483bd47..26a4c98b 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -154,6 +154,25 @@ void nano::network::flood_message (nano::message const & message_a, nano::buffer } } +void nano::network::flood_block (std::shared_ptr const & block_a, nano::buffer_drop_policy const drop_policy_a) +{ + nano::publish message (block_a); + flood_message (message, drop_policy_a); +} + +void nano::network::flood_block_initial (std::shared_ptr const & block_a) +{ + nano::publish message (block_a); + for (auto const & i : node.rep_crawler.principal_representatives ()) + { + i.channel->send (message, nullptr, nano::buffer_drop_policy::no_limiter_drop); + } + for (auto & i : list_non_pr (fanout (1.0))) + { + i->send (message, nullptr, nano::buffer_drop_policy::no_limiter_drop); + } +} + void nano::network::flood_vote (std::shared_ptr const & vote_a, float scale) { nano::confirm_ack message (vote_a); diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 546c8fff..99c68bbc 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -107,12 +107,10 @@ public: } void flood_vote (std::shared_ptr const &, float scale); void flood_vote_pr (std::shared_ptr const &); - void flood_block (std::shared_ptr block_a, nano::buffer_drop_policy drop_policy_a = nano::buffer_drop_policy::limiter) - { - nano::publish publish (block_a); - flood_message (publish, drop_policy_a); - } - + // Flood block to all PRs and a random selection of non-PRs + void flood_block_initial (std::shared_ptr const &); + // Flood block to a random selection of peers + void flood_block (std::shared_ptr const &, nano::buffer_drop_policy const = nano::buffer_drop_policy::limiter); void flood_block_many (std::deque>, std::function = nullptr, unsigned = broadcast_interval_ms); void merge_peers (std::array const &); void merge_peer (nano::endpoint const &); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index e16822d6..9ac038f9 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -627,7 +627,7 @@ nano::process_return nano::node::process_local (std::shared_ptr blo block_processor.wait_write (); // Process block auto transaction (store.tx_begin_write ({ tables::accounts, tables::cached_counts, tables::change_blocks, tables::frontiers, tables::open_blocks, tables::pending, tables::receive_blocks, tables::representation, tables::send_blocks, tables::state_blocks }, { tables::confirmation_height })); - return block_processor.process_one (transaction, info, work_watcher_a); + return block_processor.process_one (transaction, info, work_watcher_a, true); } void nano::node::start () diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index f242cadd..f3600dea 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -126,6 +126,7 @@ public: bool disable_unchecked_drop{ true }; bool disable_providing_telemetry_metrics{ false }; bool disable_block_processor_unchecked_deletion{ false }; + bool disable_block_processor_republishing{ false }; bool disable_ongoing_telemetry_requests{ false }; bool fast_bootstrap{ false }; bool read_only{ false }; diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index 08e3a9dd..6541dcf2 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -1453,6 +1453,7 @@ void nano::work_watcher::watching (nano::qualified_root const & root_a, std::sha if (!ec) { + watcher_l->node.network.flood_block_initial (block); watcher_l->node.active.update_difficulty (block); watcher_l->update (root_a, block); updated_l = true;