Aggressive flooding for local blocks (#2549)

* Aggressive flooding for local blocks

Floods locally produced blocks (going through the work watcher, or process_local - wallet, RPC process) such that they are sent to all PRs and random subset of other peers.

This allows more protection against Sybil attacks, and more streamlined elections as there are less occasions of votes arriving before blocks.

As a consequence, the average amount of echoes will increase by 1 for PRs. However, with this change in place and used by most of the network, a future enhancement would be to reduce the republishing fanout.

This change is based on a proposal by @Srayman for more information see the original proposal at https://medium.com/nanocurrency/proposal-for-nano-node-network-optimizations-21003e79cdba

* Add flag to disable republishing in block processor and a test for aggressive flooding

* Adjust timings for sanitizer builds

* Wrap in a lambda to simplify test, and adjust it
This commit is contained in:
Guilherme Lawless 2020-02-27 08:22:49 +00:00 committed by GitHub
commit ff88902d19
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 117 additions and 13 deletions

View file

@ -3613,6 +3613,84 @@ TEST (node, bandwidth_limiter)
node.stop ();
}
// Tests that local blocks are flooded to all principal representatives
TEST (node, aggressive_flooding)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
node_flags.disable_block_processor_republishing = true;
auto & node1 (*system.add_node (node_flags));
auto & wallet1 (*system.wallet (0));
wallet1.insert_adhoc (nano::test_genesis_key.prv);
std::array<std::pair<std::shared_ptr<nano::node>, std::shared_ptr<nano::wallet>>, 5> nodes_wallets{};
std::generate (nodes_wallets.begin (), nodes_wallets.end (), [&system, node_flags]() {
nano::node_config node_config;
node_config.peering_port = nano::get_available_port ();
auto node (system.add_node (node_config, node_flags));
return std::make_pair (node, system.wallet (system.nodes.size () - 1));
});
auto large_amount = (nano::genesis_amount / 2) / nodes_wallets.size ();
for (auto & node_wallet : nodes_wallets)
{
nano::keypair keypair;
node_wallet.second->store.representative_set (node_wallet.first->wallets.tx_begin_write (), keypair.pub);
node_wallet.second->insert_adhoc (keypair.prv);
wallet1.send_action (nano::test_genesis_key.pub, keypair.pub, large_amount);
}
// Wait until all nodes have a representative
system.deadline_set (!is_sanitizer_build ? 5s : 15s);
while (node1.rep_crawler.principal_representatives ().size () != nodes_wallets.size ())
{
ASSERT_NO_ERROR (system.poll ());
}
// Generate blocks and ensure they are sent to all representatives
nano::block_builder builder;
std::shared_ptr<nano::state_block> block{};
{
auto transaction (node1.store.tx_begin_read ());
block = builder.state ()
.account (nano::test_genesis_key.pub)
.representative (nano::test_genesis_key.pub)
.previous (node1.ledger.latest (transaction, nano::test_genesis_key.pub))
.balance (node1.ledger.account_balance (transaction, nano::test_genesis_key.pub) - 1)
.link (nano::test_genesis_key.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*node1.work_generate_blocking (node1.ledger.latest (transaction, nano::test_genesis_key.pub)))
.build ();
}
// Processing locally goes through the aggressive block flooding path
node1.process_local (block, false);
auto all_have_block = [&nodes_wallets](nano::block_hash const & hash_a) {
return std::all_of (nodes_wallets.begin (), nodes_wallets.end (), [hash = hash_a](auto const & node_wallet) {
return node_wallet.first->block (hash) != nullptr;
});
};
system.deadline_set (!is_sanitizer_build ? 3s : 10s);
while (!all_have_block (block->hash ()))
{
ASSERT_NO_ERROR (system.poll ());
}
// Do the same for a wallet block
auto wallet_block = wallet1.send_sync (nano::test_genesis_key.pub, nano::test_genesis_key.pub, 10);
system.deadline_set (!is_sanitizer_build ? 3s : 10s);
while (!all_have_block (wallet_block))
{
ASSERT_NO_ERROR (system.poll ());
}
// Wait until the main node has all blocks: genesis + (send+open) for each representative + 2 local blocks
// The main node only sees all blocks if other nodes are flooding their PR's open block to all other PRs
system.deadline_set (5s);
while (node1.ledger.cache.block_count < 1 + 2 * nodes_wallets.size () + 2)
{
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (active_difficulty, recalculate_work)
{
nano::system system;

View file

@ -361,7 +361,7 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
}
}
void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a, const bool watch_work_a)
void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a, const bool watch_work_a, const bool initial_publish_a)
{
// Add to work watcher to prevent dropping the election
if (watch_work_a)
@ -373,7 +373,14 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
node.active.insert (block_a, false);
// Announce block contents to the network
node.network.flood_block (block_a, nano::buffer_drop_policy::no_limiter_drop);
if (initial_publish_a)
{
node.network.flood_block_initial (block_a);
}
else if (!node.flags.disable_block_processor_republishing)
{
node.network.flood_block (block_a, nano::buffer_drop_policy::no_limiter_drop);
}
if (node.config.enable_voting && node.wallets.rep_counts ().voting > 0)
{
// Announce our weighted vote to the network
@ -381,7 +388,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
}
}
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a)
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a, const bool first_publish_a)
{
nano::process_return result;
auto hash (info_a.block->hash ());
@ -399,7 +406,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
}
if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash))
{
process_live (hash, info_a.block, watch_work_a);
process_live (hash, info_a.block, watch_work_a, first_publish_a);
}
queue_unchecked (transaction_a, hash);
break;

View file

@ -42,7 +42,7 @@ public:
bool should_log (bool);
bool have_blocks ();
void process_blocks ();
nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false);
nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false, const bool = false);
nano::process_return process_one (nano::write_transaction const &, std::shared_ptr<nano::block>, const bool = false);
nano::vote_generator generator;
// Delay required for average network propagartion before requesting confirmation
@ -52,7 +52,7 @@ private:
void queue_unchecked (nano::write_transaction const &, nano::block_hash const &);
void verify_state_blocks (nano::unique_lock<std::mutex> &, size_t = std::numeric_limits<size_t>::max ());
void process_batch (nano::unique_lock<std::mutex> &);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, const bool = false);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, const bool = false, const bool = false);
void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &);
bool stopped;
bool active;

View file

@ -154,6 +154,25 @@ void nano::network::flood_message (nano::message const & message_a, nano::buffer
}
}
void nano::network::flood_block (std::shared_ptr<nano::block> const & block_a, nano::buffer_drop_policy const drop_policy_a)
{
nano::publish message (block_a);
flood_message (message, drop_policy_a);
}
void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block_a)
{
nano::publish message (block_a);
for (auto const & i : node.rep_crawler.principal_representatives ())
{
i.channel->send (message, nullptr, nano::buffer_drop_policy::no_limiter_drop);
}
for (auto & i : list_non_pr (fanout (1.0)))
{
i->send (message, nullptr, nano::buffer_drop_policy::no_limiter_drop);
}
}
void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote_a, float scale)
{
nano::confirm_ack message (vote_a);

View file

@ -107,12 +107,10 @@ public:
}
void flood_vote (std::shared_ptr<nano::vote> const &, float scale);
void flood_vote_pr (std::shared_ptr<nano::vote> const &);
void flood_block (std::shared_ptr<nano::block> block_a, nano::buffer_drop_policy drop_policy_a = nano::buffer_drop_policy::limiter)
{
nano::publish publish (block_a);
flood_message (publish, drop_policy_a);
}
// Flood block to all PRs and a random selection of non-PRs
void flood_block_initial (std::shared_ptr<nano::block> const &);
// Flood block to a random selection of peers
void flood_block (std::shared_ptr<nano::block> const &, nano::buffer_drop_policy const = nano::buffer_drop_policy::limiter);
void flood_block_many (std::deque<std::shared_ptr<nano::block>>, std::function<void()> = nullptr, unsigned = broadcast_interval_ms);
void merge_peers (std::array<nano::endpoint, 8> const &);
void merge_peer (nano::endpoint const &);

View file

@ -627,7 +627,7 @@ nano::process_return nano::node::process_local (std::shared_ptr<nano::block> blo
block_processor.wait_write ();
// Process block
auto transaction (store.tx_begin_write ({ tables::accounts, tables::cached_counts, tables::change_blocks, tables::frontiers, tables::open_blocks, tables::pending, tables::receive_blocks, tables::representation, tables::send_blocks, tables::state_blocks }, { tables::confirmation_height }));
return block_processor.process_one (transaction, info, work_watcher_a);
return block_processor.process_one (transaction, info, work_watcher_a, true);
}
void nano::node::start ()

View file

@ -126,6 +126,7 @@ public:
bool disable_unchecked_drop{ true };
bool disable_providing_telemetry_metrics{ false };
bool disable_block_processor_unchecked_deletion{ false };
bool disable_block_processor_republishing{ false };
bool disable_ongoing_telemetry_requests{ false };
bool fast_bootstrap{ false };
bool read_only{ false };

View file

@ -1453,6 +1453,7 @@ void nano::work_watcher::watching (nano::qualified_root const & root_a, std::sha
if (!ec)
{
watcher_l->node.network.flood_block_initial (block);
watcher_l->node.active.update_difficulty (block);
watcher_l->update (root_a, block);
updated_l = true;