From 066724f6b8fdd381686c12e3a2622598835fa3d5 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Thu, 14 Mar 2019 00:14:25 +0000 Subject: [PATCH] Rename republish_x to flood_x to better describe what we're doing. (#1825) Flood messages through a common function instead of having their own code. --- nano/core_test/gap_cache.cpp | 2 +- nano/core_test/network.cpp | 4 +- nano/core_test/node.cpp | 6 +-- nano/node/blockprocessor.cpp | 2 +- nano/node/node.cpp | 86 ++++++------------------------------ nano/node/node.hpp | 20 ++++++--- nano/node/rpc.cpp | 4 +- nano/node/wallet.cpp | 2 +- nano/qt/qt.cpp | 2 +- nano/slow_test/node.cpp | 2 +- 10 files changed, 38 insertions(+), 92 deletions(-) diff --git a/nano/core_test/gap_cache.cpp b/nano/core_test/gap_cache.cpp index 11717deb5..6610cc4f6 100644 --- a/nano/core_test/gap_cache.cpp +++ b/nano/core_test/gap_cache.cpp @@ -88,7 +88,7 @@ TEST (gap_cache, gap_bootstrap) // The separate publish and vote system doesn't work very well here because it's instantly confirmed. // We help it get the block and vote out here. auto transaction (system.nodes[0]->store.tx_begin ()); - system.nodes[0]->network.republish_block (latest_block); + system.nodes[0]->network.flood_block (latest_block); } while (system.nodes[1]->balance (nano::genesis_account) != nano::genesis_amount - 200) { diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 00c835218..3d520d647 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -190,7 +190,7 @@ TEST (network, send_discarded_publish) nano::genesis genesis; { auto transaction (system.nodes[0]->store.tx_begin ()); - system.nodes[0]->network.republish_block (block); + system.nodes[0]->network.flood_block (block); ASSERT_EQ (genesis.hash (), system.nodes[0]->ledger.latest (transaction, nano::test_genesis_key.pub)); ASSERT_EQ (genesis.hash (), system.nodes[1]->latest (nano::test_genesis_key.pub)); } @@ -211,7 +211,7 @@ TEST (network, send_invalid_publish) auto block (std::make_shared (1, 1, 20, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (1))); { auto transaction (system.nodes[0]->store.tx_begin ()); - system.nodes[0]->network.republish_block (block); + system.nodes[0]->network.flood_block (block); ASSERT_EQ (genesis.hash (), system.nodes[0]->ledger.latest (transaction, nano::test_genesis_key.pub)); ASSERT_EQ (genesis.hash (), system.nodes[1]->latest (nano::test_genesis_key.pub)); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index e827098a1..0c2f745bd 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -480,7 +480,7 @@ TEST (node, confirm_locked) auto transaction (system.nodes[0]->store.tx_begin ()); system.wallet (0)->enter_password (transaction, "1"); auto block (std::make_shared (0, 0, 0, nano::keypair ().prv, 0, 0)); - system.nodes[0]->network.republish_block (block); + system.nodes[0]->network.flood_block (block); } TEST (node_config, serialization) @@ -2096,9 +2096,9 @@ TEST (node, fork_invalid_block_signature) } auto vote (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2)); auto vote_corrupt (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2_corrupt)); - system.nodes[1]->network.republish_vote (vote_corrupt); + system.nodes[1]->network.flood_vote (vote_corrupt); ASSERT_NO_ERROR (system.poll ()); - system.nodes[1]->network.republish_vote (vote); + system.nodes[1]->network.flood_vote (vote); while (system.nodes[0]->block (send1->hash ())) { ASSERT_NO_ERROR (system.poll ()); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index cbdf1d677..1a0d1ec0e 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -348,7 +348,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std:: // Start collecting quorum on block node.active.start (block_a); // Announce block contents to the network - node.network.republish_block (block_a); + node.network.flood_block (block_a); if (node.config.enable_voting) { // Announce our weighted vote to the network diff --git a/nano/node/node.cpp b/nano/node/node.cpp index d38156f8b..f28c77cfc 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -221,28 +221,6 @@ void nano::network::send_node_id_handshake (nano::endpoint const & endpoint_a, b }); } -void nano::network::republish (nano::block_hash const & hash_a, std::shared_ptr> buffer_a, nano::endpoint endpoint_a) -{ - if (node.config.logging.network_publish_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Publishing %1% to %2%") % hash_a.to_string () % endpoint_a)); - } - std::weak_ptr node_w (node.shared ()); - send_buffer (buffer_a->data (), buffer_a->size (), endpoint_a, [buffer_a, node_w, endpoint_a](boost::system::error_code const & ec, size_t size) { - if (auto node_l = node_w.lock ()) - { - if (ec && node_l->config.logging.network_logging ()) - { - node_l->logger.try_log (boost::str (boost::format ("Error sending publish to %1%: %2%") % endpoint_a % ec.message ())); - } - else - { - node_l->stats.inc (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out); - } - } - }); -} - template bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, T & list_a, std::shared_ptr block_a, bool also_publish) { @@ -288,7 +266,7 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a publish_bytes = publish.to_bytes (); for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) { - node_a.network.republish (hash, publish_bytes, *j); + node_a.network.send_buffer (publish_bytes->data (), publish_bytes->size (), *j, [publish_bytes](boost::system::error_code const &, size_t) {}); } } } @@ -337,73 +315,33 @@ bool nano::network::send_votes_cache (nano::block_hash const & hash_a, nano::end return result; } -void nano::network::republish_block (std::shared_ptr block) +void nano::network::flood_message (nano::message const & message_a) { - auto hash (block->hash ()); auto list (node.peers.list_fanout ()); - nano::publish message (block); - auto bytes = message.to_bytes (); for (auto i (list.begin ()), n (list.end ()); i != n; ++i) { - republish (hash, bytes, *i); - } - if (node.config.logging.network_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Block %1% was republished to peers") % hash.to_string ())); + auto buffer (message_a.to_bytes ()); + send_buffer (buffer->data (), buffer->size (), *i, [buffer](boost::system::error_code const &, size_t) {}); } } -void nano::network::republish_block (std::shared_ptr block, nano::endpoint const & peer_a) -{ - auto hash (block->hash ()); - nano::publish message (block); - std::vector bytes; - { - nano::vectorstream stream (bytes); - message.serialize (stream); - } - republish (hash, std::make_shared> (bytes), peer_a); - if (node.config.logging.network_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Block %1% was republished to peers") % hash.to_string ())); - } -} - -void nano::network::republish_block_batch (std::deque> blocks_a, unsigned delay_a) +void nano::network::flood_block_batch (std::deque> blocks_a, unsigned delay_a) { auto block (blocks_a.front ()); blocks_a.pop_front (); - republish_block (block); + flood_block (block); if (!blocks_a.empty ()) { std::weak_ptr node_w (node.shared ()); node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks_a, delay_a]() { if (auto node_l = node_w.lock ()) { - node_l->network.republish_block_batch (blocks_a, delay_a); + node_l->network.flood_block_batch (blocks_a, delay_a); } }); } } -// In order to rate limit network traffic we republish: -// 1) Only if they are a non-replay vote of a block that's actively settling. Settling blocks are limited by block PoW -// 2) The rep has a weight > Y to prevent creating a lot of small-weight accounts to send out votes -// 3) Only if a vote for this block from this representative hasn't been received in the previous X second. -// This prevents rapid publishing of votes with increasing sequence numbers. -// -// These rules are implemented by the caller, not this function. -void nano::network::republish_vote (std::shared_ptr vote_a) -{ - nano::confirm_ack confirm (vote_a); - auto bytes = confirm.to_bytes (); - auto list (node.peers.list_fanout ()); - for (auto j (list.begin ()), m (list.end ()); j != m; ++j) - { - node.network.confirm_send (confirm, bytes, *j); - } -} - void nano::network::broadcast_confirm_req (std::shared_ptr block_a) { auto list (std::make_shared> (node.rep_crawler.representative_endpoints (std::numeric_limits::max ()))); @@ -673,7 +611,9 @@ public: } auto successor_block (node.store.block_get (transaction, successor)); assert (successor_block != nullptr); - node.network.republish_block (std::move (successor_block), sender); + nano::publish publish (successor_block); + auto buffer (publish.to_bytes ()); + node.network.send_buffer (buffer->data (), buffer->size (), sender, [buffer](boost::system::error_code const &, size_t) {}); } } } @@ -3195,7 +3135,7 @@ bool nano::election::publish (std::shared_ptr block_a) { blocks.insert (std::make_pair (block_a->hash (), block_a)); confirm_if_quorum (transaction); - node.network.republish_block (block_a); + node.network.flood_block (block_a); } } } @@ -3386,7 +3326,7 @@ void nano::active_transactions::request_confirm (std::unique_lock & // Rebroadcast unconfirmed blocks if (!rebroadcast_bundle.empty ()) { - node.network.republish_block_batch (rebroadcast_bundle); + node.network.flood_block_batch (rebroadcast_bundle); } // Batch confirmation request if (!node.network_params.is_live_network () && !requests_bundle.empty ()) @@ -3518,7 +3458,7 @@ bool nano::active_transactions::vote (std::shared_ptr vote_a, bool s } if (processed) { - node.network.republish_vote (vote_a); + node.network.flood_vote (vote_a); } return replay; } diff --git a/nano/node/node.hpp b/nano/node/node.hpp index ad9c9b9e5..9d5a02159 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -327,13 +327,18 @@ public: void start (); void stop (); void receive_action (nano::udp_data *, nano::endpoint const &); - void rpc_action (boost::system::error_code const &, size_t); - void republish_vote (std::shared_ptr); - void republish_block (std::shared_ptr); - void republish_block (std::shared_ptr, nano::endpoint const &); - static unsigned const broadcast_interval_ms = 10; - void republish_block_batch (std::deque>, unsigned = broadcast_interval_ms); - void republish (nano::block_hash const &, std::shared_ptr>, nano::endpoint); + void flood_message (nano::message const &); + void flood_vote (std::shared_ptr vote_a) + { + nano::confirm_ack message (vote_a); + flood_message (message); + } + void flood_block (std::shared_ptr block_a) + { + nano::publish publish (block_a); + flood_message (publish); + } + void flood_block_batch (std::deque>, unsigned = broadcast_interval_ms); void confirm_send (nano::confirm_ack const &, std::shared_ptr>, nano::endpoint const &); void merge_peers (std::array const &); void send_keepalive (nano::endpoint const &); @@ -356,6 +361,7 @@ public: nano::node & node; static size_t const buffer_size = 512; static size_t const confirm_req_hashes_max = 6; + static unsigned const broadcast_interval_ms = 10; }; class node_init diff --git a/nano/node/rpc.cpp b/nano/node/rpc.cpp index 693c2c136..7c85b26df 100644 --- a/nano/node/rpc.cpp +++ b/nano/node/rpc.cpp @@ -3032,7 +3032,7 @@ void nano::rpc_handler::republish () } hash = node.store.block_successor (transaction, hash); } - node.network.republish_block_batch (republish_bundle, 25); + node.network.flood_block_batch (republish_bundle, 25); response_l.put ("success", ""); // obsolete response_l.add_child ("blocks", blocks); } @@ -4132,7 +4132,7 @@ void nano::rpc_handler::wallet_republish () blocks.push_back (std::make_pair ("", entry)); } } - node.network.republish_block_batch (republish_bundle, 25); + node.network.flood_block_batch (republish_bundle, 25); response_l.add_child ("blocks", blocks); } response_errors (); diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index 435459811..b923e6411 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -1053,7 +1053,7 @@ std::shared_ptr nano::wallet::send_action (nano::account const & so if (block != nullptr) { cached_block = true; - wallets.node.network.republish_block (block); + wallets.node.network.flood_block (block); } } else if (status != MDB_NOTFOUND) diff --git a/nano/qt/qt.cpp b/nano/qt/qt.cpp index f2bda9a19..5441e02c2 100644 --- a/nano/qt/qt.cpp +++ b/nano/qt/qt.cpp @@ -703,7 +703,7 @@ void nano_qt::block_viewer::rebroadcast_action (nano::uint256_union const & hash auto block (wallet.node.store.block_get (transaction, hash_a)); if (block != nullptr) { - wallet.node.network.republish_block (std::move (block)); + wallet.node.network.flood_block (std::move (block)); auto successor (wallet.node.store.block_successor (transaction, hash_a)); if (!successor.is_zero ()) { diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 9a06735c7..d5412a13e 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -193,7 +193,7 @@ TEST (node, fork_storm) auto open_result (system.nodes[i]->process (*open)); ASSERT_EQ (nano::process_result::progress, open_result.code); auto transaction (system.nodes[i]->store.tx_begin ()); - system.nodes[i]->network.republish_block (open); + system.nodes[i]->network.flood_block (open); } } auto again (true);