Rename republish_x to flood_x to better describe what we're doing. (#1825)
Flood messages through a common function instead of having their own code.
This commit is contained in:
parent
cae08d2ea5
commit
066724f6b8
10 changed files with 38 additions and 92 deletions
|
|
@ -88,7 +88,7 @@ TEST (gap_cache, gap_bootstrap)
|
|||
// The separate publish and vote system doesn't work very well here because it's instantly confirmed.
|
||||
// We help it get the block and vote out here.
|
||||
auto transaction (system.nodes[0]->store.tx_begin ());
|
||||
system.nodes[0]->network.republish_block (latest_block);
|
||||
system.nodes[0]->network.flood_block (latest_block);
|
||||
}
|
||||
while (system.nodes[1]->balance (nano::genesis_account) != nano::genesis_amount - 200)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -190,7 +190,7 @@ TEST (network, send_discarded_publish)
|
|||
nano::genesis genesis;
|
||||
{
|
||||
auto transaction (system.nodes[0]->store.tx_begin ());
|
||||
system.nodes[0]->network.republish_block (block);
|
||||
system.nodes[0]->network.flood_block (block);
|
||||
ASSERT_EQ (genesis.hash (), system.nodes[0]->ledger.latest (transaction, nano::test_genesis_key.pub));
|
||||
ASSERT_EQ (genesis.hash (), system.nodes[1]->latest (nano::test_genesis_key.pub));
|
||||
}
|
||||
|
|
@ -211,7 +211,7 @@ TEST (network, send_invalid_publish)
|
|||
auto block (std::make_shared<nano::send_block> (1, 1, 20, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (1)));
|
||||
{
|
||||
auto transaction (system.nodes[0]->store.tx_begin ());
|
||||
system.nodes[0]->network.republish_block (block);
|
||||
system.nodes[0]->network.flood_block (block);
|
||||
ASSERT_EQ (genesis.hash (), system.nodes[0]->ledger.latest (transaction, nano::test_genesis_key.pub));
|
||||
ASSERT_EQ (genesis.hash (), system.nodes[1]->latest (nano::test_genesis_key.pub));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -480,7 +480,7 @@ TEST (node, confirm_locked)
|
|||
auto transaction (system.nodes[0]->store.tx_begin ());
|
||||
system.wallet (0)->enter_password (transaction, "1");
|
||||
auto block (std::make_shared<nano::send_block> (0, 0, 0, nano::keypair ().prv, 0, 0));
|
||||
system.nodes[0]->network.republish_block (block);
|
||||
system.nodes[0]->network.flood_block (block);
|
||||
}
|
||||
|
||||
TEST (node_config, serialization)
|
||||
|
|
@ -2096,9 +2096,9 @@ TEST (node, fork_invalid_block_signature)
|
|||
}
|
||||
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2));
|
||||
auto vote_corrupt (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2_corrupt));
|
||||
system.nodes[1]->network.republish_vote (vote_corrupt);
|
||||
system.nodes[1]->network.flood_vote (vote_corrupt);
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
system.nodes[1]->network.republish_vote (vote);
|
||||
system.nodes[1]->network.flood_vote (vote);
|
||||
while (system.nodes[0]->block (send1->hash ()))
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
|
|
|
|||
|
|
@ -348,7 +348,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
|
|||
// Start collecting quorum on block
|
||||
node.active.start (block_a);
|
||||
// Announce block contents to the network
|
||||
node.network.republish_block (block_a);
|
||||
node.network.flood_block (block_a);
|
||||
if (node.config.enable_voting)
|
||||
{
|
||||
// Announce our weighted vote to the network
|
||||
|
|
|
|||
|
|
@ -221,28 +221,6 @@ void nano::network::send_node_id_handshake (nano::endpoint const & endpoint_a, b
|
|||
});
|
||||
}
|
||||
|
||||
void nano::network::republish (nano::block_hash const & hash_a, std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::endpoint endpoint_a)
|
||||
{
|
||||
if (node.config.logging.network_publish_logging ())
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Publishing %1% to %2%") % hash_a.to_string () % endpoint_a));
|
||||
}
|
||||
std::weak_ptr<nano::node> node_w (node.shared ());
|
||||
send_buffer (buffer_a->data (), buffer_a->size (), endpoint_a, [buffer_a, node_w, endpoint_a](boost::system::error_code const & ec, size_t size) {
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
if (ec && node_l->config.logging.network_logging ())
|
||||
{
|
||||
node_l->logger.try_log (boost::str (boost::format ("Error sending publish to %1%: %2%") % endpoint_a % ec.message ()));
|
||||
}
|
||||
else
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::out);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, T & list_a, std::shared_ptr<nano::block> block_a, bool also_publish)
|
||||
{
|
||||
|
|
@ -288,7 +266,7 @@ bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a
|
|||
publish_bytes = publish.to_bytes ();
|
||||
for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j)
|
||||
{
|
||||
node_a.network.republish (hash, publish_bytes, *j);
|
||||
node_a.network.send_buffer (publish_bytes->data (), publish_bytes->size (), *j, [publish_bytes](boost::system::error_code const &, size_t) {});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -337,73 +315,33 @@ bool nano::network::send_votes_cache (nano::block_hash const & hash_a, nano::end
|
|||
return result;
|
||||
}
|
||||
|
||||
void nano::network::republish_block (std::shared_ptr<nano::block> block)
|
||||
void nano::network::flood_message (nano::message const & message_a)
|
||||
{
|
||||
auto hash (block->hash ());
|
||||
auto list (node.peers.list_fanout ());
|
||||
nano::publish message (block);
|
||||
auto bytes = message.to_bytes ();
|
||||
for (auto i (list.begin ()), n (list.end ()); i != n; ++i)
|
||||
{
|
||||
republish (hash, bytes, *i);
|
||||
}
|
||||
if (node.config.logging.network_logging ())
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Block %1% was republished to peers") % hash.to_string ()));
|
||||
auto buffer (message_a.to_bytes ());
|
||||
send_buffer (buffer->data (), buffer->size (), *i, [buffer](boost::system::error_code const &, size_t) {});
|
||||
}
|
||||
}
|
||||
|
||||
void nano::network::republish_block (std::shared_ptr<nano::block> block, nano::endpoint const & peer_a)
|
||||
{
|
||||
auto hash (block->hash ());
|
||||
nano::publish message (block);
|
||||
std::vector<uint8_t> bytes;
|
||||
{
|
||||
nano::vectorstream stream (bytes);
|
||||
message.serialize (stream);
|
||||
}
|
||||
republish (hash, std::make_shared<std::vector<uint8_t>> (bytes), peer_a);
|
||||
if (node.config.logging.network_logging ())
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Block %1% was republished to peers") % hash.to_string ()));
|
||||
}
|
||||
}
|
||||
|
||||
void nano::network::republish_block_batch (std::deque<std::shared_ptr<nano::block>> blocks_a, unsigned delay_a)
|
||||
void nano::network::flood_block_batch (std::deque<std::shared_ptr<nano::block>> blocks_a, unsigned delay_a)
|
||||
{
|
||||
auto block (blocks_a.front ());
|
||||
blocks_a.pop_front ();
|
||||
republish_block (block);
|
||||
flood_block (block);
|
||||
if (!blocks_a.empty ())
|
||||
{
|
||||
std::weak_ptr<nano::node> node_w (node.shared ());
|
||||
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks_a, delay_a]() {
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
node_l->network.republish_block_batch (blocks_a, delay_a);
|
||||
node_l->network.flood_block_batch (blocks_a, delay_a);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// In order to rate limit network traffic we republish:
|
||||
// 1) Only if they are a non-replay vote of a block that's actively settling. Settling blocks are limited by block PoW
|
||||
// 2) The rep has a weight > Y to prevent creating a lot of small-weight accounts to send out votes
|
||||
// 3) Only if a vote for this block from this representative hasn't been received in the previous X second.
|
||||
// This prevents rapid publishing of votes with increasing sequence numbers.
|
||||
//
|
||||
// These rules are implemented by the caller, not this function.
|
||||
void nano::network::republish_vote (std::shared_ptr<nano::vote> vote_a)
|
||||
{
|
||||
nano::confirm_ack confirm (vote_a);
|
||||
auto bytes = confirm.to_bytes ();
|
||||
auto list (node.peers.list_fanout ());
|
||||
for (auto j (list.begin ()), m (list.end ()); j != m; ++j)
|
||||
{
|
||||
node.network.confirm_send (confirm, bytes, *j);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::network::broadcast_confirm_req (std::shared_ptr<nano::block> block_a)
|
||||
{
|
||||
auto list (std::make_shared<std::vector<nano::endpoint>> (node.rep_crawler.representative_endpoints (std::numeric_limits<size_t>::max ())));
|
||||
|
|
@ -673,7 +611,9 @@ public:
|
|||
}
|
||||
auto successor_block (node.store.block_get (transaction, successor));
|
||||
assert (successor_block != nullptr);
|
||||
node.network.republish_block (std::move (successor_block), sender);
|
||||
nano::publish publish (successor_block);
|
||||
auto buffer (publish.to_bytes ());
|
||||
node.network.send_buffer (buffer->data (), buffer->size (), sender, [buffer](boost::system::error_code const &, size_t) {});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3195,7 +3135,7 @@ bool nano::election::publish (std::shared_ptr<nano::block> block_a)
|
|||
{
|
||||
blocks.insert (std::make_pair (block_a->hash (), block_a));
|
||||
confirm_if_quorum (transaction);
|
||||
node.network.republish_block (block_a);
|
||||
node.network.flood_block (block_a);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -3386,7 +3326,7 @@ void nano::active_transactions::request_confirm (std::unique_lock<std::mutex> &
|
|||
// Rebroadcast unconfirmed blocks
|
||||
if (!rebroadcast_bundle.empty ())
|
||||
{
|
||||
node.network.republish_block_batch (rebroadcast_bundle);
|
||||
node.network.flood_block_batch (rebroadcast_bundle);
|
||||
}
|
||||
// Batch confirmation request
|
||||
if (!node.network_params.is_live_network () && !requests_bundle.empty ())
|
||||
|
|
@ -3518,7 +3458,7 @@ bool nano::active_transactions::vote (std::shared_ptr<nano::vote> vote_a, bool s
|
|||
}
|
||||
if (processed)
|
||||
{
|
||||
node.network.republish_vote (vote_a);
|
||||
node.network.flood_vote (vote_a);
|
||||
}
|
||||
return replay;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -327,13 +327,18 @@ public:
|
|||
void start ();
|
||||
void stop ();
|
||||
void receive_action (nano::udp_data *, nano::endpoint const &);
|
||||
void rpc_action (boost::system::error_code const &, size_t);
|
||||
void republish_vote (std::shared_ptr<nano::vote>);
|
||||
void republish_block (std::shared_ptr<nano::block>);
|
||||
void republish_block (std::shared_ptr<nano::block>, nano::endpoint const &);
|
||||
static unsigned const broadcast_interval_ms = 10;
|
||||
void republish_block_batch (std::deque<std::shared_ptr<nano::block>>, unsigned = broadcast_interval_ms);
|
||||
void republish (nano::block_hash const &, std::shared_ptr<std::vector<uint8_t>>, nano::endpoint);
|
||||
void flood_message (nano::message const &);
|
||||
void flood_vote (std::shared_ptr<nano::vote> vote_a)
|
||||
{
|
||||
nano::confirm_ack message (vote_a);
|
||||
flood_message (message);
|
||||
}
|
||||
void flood_block (std::shared_ptr<nano::block> block_a)
|
||||
{
|
||||
nano::publish publish (block_a);
|
||||
flood_message (publish);
|
||||
}
|
||||
void flood_block_batch (std::deque<std::shared_ptr<nano::block>>, unsigned = broadcast_interval_ms);
|
||||
void confirm_send (nano::confirm_ack const &, std::shared_ptr<std::vector<uint8_t>>, nano::endpoint const &);
|
||||
void merge_peers (std::array<nano::endpoint, 8> const &);
|
||||
void send_keepalive (nano::endpoint const &);
|
||||
|
|
@ -356,6 +361,7 @@ public:
|
|||
nano::node & node;
|
||||
static size_t const buffer_size = 512;
|
||||
static size_t const confirm_req_hashes_max = 6;
|
||||
static unsigned const broadcast_interval_ms = 10;
|
||||
};
|
||||
|
||||
class node_init
|
||||
|
|
|
|||
|
|
@ -3032,7 +3032,7 @@ void nano::rpc_handler::republish ()
|
|||
}
|
||||
hash = node.store.block_successor (transaction, hash);
|
||||
}
|
||||
node.network.republish_block_batch (republish_bundle, 25);
|
||||
node.network.flood_block_batch (republish_bundle, 25);
|
||||
response_l.put ("success", ""); // obsolete
|
||||
response_l.add_child ("blocks", blocks);
|
||||
}
|
||||
|
|
@ -4132,7 +4132,7 @@ void nano::rpc_handler::wallet_republish ()
|
|||
blocks.push_back (std::make_pair ("", entry));
|
||||
}
|
||||
}
|
||||
node.network.republish_block_batch (republish_bundle, 25);
|
||||
node.network.flood_block_batch (republish_bundle, 25);
|
||||
response_l.add_child ("blocks", blocks);
|
||||
}
|
||||
response_errors ();
|
||||
|
|
|
|||
|
|
@ -1053,7 +1053,7 @@ std::shared_ptr<nano::block> nano::wallet::send_action (nano::account const & so
|
|||
if (block != nullptr)
|
||||
{
|
||||
cached_block = true;
|
||||
wallets.node.network.republish_block (block);
|
||||
wallets.node.network.flood_block (block);
|
||||
}
|
||||
}
|
||||
else if (status != MDB_NOTFOUND)
|
||||
|
|
|
|||
|
|
@ -703,7 +703,7 @@ void nano_qt::block_viewer::rebroadcast_action (nano::uint256_union const & hash
|
|||
auto block (wallet.node.store.block_get (transaction, hash_a));
|
||||
if (block != nullptr)
|
||||
{
|
||||
wallet.node.network.republish_block (std::move (block));
|
||||
wallet.node.network.flood_block (std::move (block));
|
||||
auto successor (wallet.node.store.block_successor (transaction, hash_a));
|
||||
if (!successor.is_zero ())
|
||||
{
|
||||
|
|
|
|||
|
|
@ -193,7 +193,7 @@ TEST (node, fork_storm)
|
|||
auto open_result (system.nodes[i]->process (*open));
|
||||
ASSERT_EQ (nano::process_result::progress, open_result.code);
|
||||
auto transaction (system.nodes[i]->store.tx_begin ());
|
||||
system.nodes[i]->network.republish_block (open);
|
||||
system.nodes[i]->network.flood_block (open);
|
||||
}
|
||||
}
|
||||
auto again (true);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue