Allow rep crawler targets with ephemeral ports (#2527)

* Allow rep crawler targets with ephemeral ports
Representatives behind NAT or with blocked incoming TCP connections
* Update test to not check self confirmation as not required
* Better explanation for temporary channels
This commit is contained in:
Sergey Kroshnin 2020-02-04 03:51:48 +03:00 committed by GitHub
commit e32b6b83e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 51 additions and 26 deletions

View file

@ -3428,16 +3428,15 @@ TEST (node, dont_write_lock_node)
finished_promise.set_value ();
}
// Test is unstable on github actions for windows, disable if CI detected
#if (defined(_WIN32) && CI)
TEST (node, DISABLED_bidirectional_tcp)
#else
TEST (node, bidirectional_tcp)
#endif
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_udp = true; // Disable UDP connections
// Disable bootstrap to start elections for new blocks
node_flags.disable_legacy_bootstrap = true;
node_flags.disable_lazy_bootstrap = true;
node_flags.disable_wallet_bootstrap = true;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node1 = system.add_node (node_config, node_flags);
@ -3466,17 +3465,32 @@ TEST (node, bidirectional_tcp)
{
ASSERT_NO_ERROR (system.poll ());
}
// Test block confirmation from node 1
// Test block confirmation from node 1 (add representative to node 1)
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
// Wait to find new reresentative
system.deadline_set (10s);
while (node2->rep_crawler.representative_count () == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
/* Wait for confirmation
To check connection we need only node 2 confirmation status
Node 1 election can be unconfirmed because representative private key was inserted after election start (and node 2 isn't flooding new votes to principal representatives) */
bool confirmed (false);
system.deadline_set (10s);
while (!confirmed)
{
auto transaction1 (node1->store.tx_begin_read ());
auto transaction2 (node2->store.tx_begin_read ());
confirmed = node1->ledger.block_confirmed (transaction1, send1->hash ()) && node2->ledger.block_confirmed (transaction2, send1->hash ());
confirmed = node2->ledger.block_confirmed (transaction2, send1->hash ());
ASSERT_NO_ERROR (system.poll ());
}
// Test block propagation & confirmation from node 2 (remove representative from node 1)
{
auto transaction (system.wallet (0)->wallets.tx_begin_write ());
system.wallet (0)->store.erase (transaction, nano::test_genesis_key.pub);
}
/* Test block propagation from node 2
Node 2 has only ephemeral TCP port open. Node 1 cannot establish connection to node 2 listening port */
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1->work_generate_blocking (send1->hash ())));
node2->process_active (send2);
node2->block_processor.flush ();
@ -3485,14 +3499,23 @@ TEST (node, bidirectional_tcp)
{
ASSERT_NO_ERROR (system.poll ());
}
// Test block confirmation from node 2
// Test block confirmation from node 2 (add representative to node 2)
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
// Wait to find changed reresentative
system.deadline_set (10s);
while (node1->rep_crawler.representative_count () == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
/* Wait for confirmation
To check connection we need only node 1 confirmation status
Node 2 election can be unconfirmed because representative private key was inserted after election start (and node 1 isn't flooding new votes to principal representatives) */
confirmed = false;
system.deadline_set (20s);
while (!confirmed)
{
auto transaction1 (node1->store.tx_begin_read ());
auto transaction2 (node2->store.tx_begin_read ());
confirmed = node1->ledger.block_confirmed (transaction1, send2->hash ()) && node2->ledger.block_confirmed (transaction2, send2->hash ());
confirmed = node1->ledger.block_confirmed (transaction1, send2->hash ());
ASSERT_NO_ERROR (system.poll ());
}
}

View file

@ -118,7 +118,7 @@ nano::bootstrap_server::~bootstrap_server ()
auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint));
if (exisiting_response_channel != nullptr)
{
exisiting_response_channel->server = false;
exisiting_response_channel->temporary = false;
node->network.tcp_channels.erase (remote_endpoint);
}
}

View file

@ -667,9 +667,9 @@ size_t nano::network::fanout (float scale) const
return static_cast<size_t> (std::ceil (scale * size_sqrt ()));
}
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (size_t count_a, uint8_t min_version_a) const
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (size_t count_a, uint8_t min_version_a, bool include_temporary_channels_a) const
{
std::unordered_set<std::shared_ptr<nano::transport::channel>> result (tcp_channels.random_set (count_a, min_version_a));
std::unordered_set<std::shared_ptr<nano::transport::channel>> result (tcp_channels.random_set (count_a, min_version_a, include_temporary_channels_a));
std::unordered_set<std::shared_ptr<nano::transport::channel>> udp_random (udp_channels.random_set (count_a, min_version_a));
for (auto i (udp_random.begin ()), n (udp_random.end ()); i != n && result.size () < count_a * 1.5; ++i)
{
@ -684,7 +684,7 @@ std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::ran
void nano::network::random_fill (std::array<nano::endpoint, 8> & target_a) const
{
auto peers (random_set (target_a.size ()));
auto peers (random_set (target_a.size (), 0, false)); // Don't include channels with ephemeral remote ports
assert (peers.size () <= target_a.size ());
auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0));
assert (endpoint.address ().is_v6 ());

View file

@ -137,7 +137,7 @@ public:
// Desired fanout for a given scale
size_t fanout (float scale = 1.0f) const;
void random_fill (std::array<nano::endpoint, 8> &) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0, bool = false) const;
// Get the next peer for attempting a tcp bootstrap connection
nano::tcp_endpoint bootstrap_peer (bool = false);
nano::endpoint endpoint ();

View file

@ -139,7 +139,7 @@ std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::get_cr
required_peer_count += required_peer_count / 2;
// The rest of the endpoints are picked randomly
auto random_peers (node.network.random_set (required_peer_count));
auto random_peers (node.network.random_set (required_peer_count, 0, true)); // Include channels with ephemeral remote ports
std::vector<std::shared_ptr<nano::transport::channel>> result;
result.insert (result.end (), random_peers.begin (), random_peers.end ());
return result;

View file

@ -46,7 +46,7 @@ void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano::
void nano::telemetry::get_metrics_random_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a)
{
// These peers will only be used if there isn't an already ongoing batch telemetry request round
auto random_peers = network.random_set (network.size_sqrt (), network_params.protocol.telemetry_protocol_version_min);
auto random_peers = network.random_set (network.size_sqrt (), network_params.protocol.telemetry_protocol_version_min, true);
nano::lock_guard<std::mutex> guard (mutex);
if (!stopped && !random_peers.empty ())
{

View file

@ -16,7 +16,7 @@ nano::transport::channel_tcp::~channel_tcp ()
// Close socket. Exception: socket is used by bootstrap_server
if (auto socket_l = socket.lock ())
{
if (!server)
if (!temporary)
{
socket_l->close ();
}
@ -111,7 +111,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::cha
if (existing == channels.get<endpoint_tag> ().end ())
{
auto node_id (channel_a->get_node_id ());
if (!channel_a->server)
if (!channel_a->temporary)
{
channels.get<node_id_tag> ().erase (node_id);
}
@ -152,7 +152,7 @@ std::shared_ptr<nano::transport::channel_tcp> nano::transport::tcp_channels::fin
return result;
}
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::random_set (size_t count_a, uint8_t min_version) const
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::random_set (size_t count_a, uint8_t min_version, bool include_temporary_channels_a) const
{
std::unordered_set<std::shared_ptr<nano::transport::channel>> result;
result.reserve (count_a);
@ -169,7 +169,7 @@ std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::t
auto index (nano::random_pool::generate_word32 (0, static_cast<CryptoPP::word32> (peers_size - 1)));
auto channel = channels.get<random_access_tag> ()[index].channel;
if (channel->get_network_version () >= min_version && !channel->server)
if (channel->get_network_version () >= min_version && (include_temporary_channels_a || !channel->temporary))
{
result.insert (channel);
}
@ -284,7 +284,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
temporary_channel->set_network_version (message_a.header.version_using);
temporary_channel->set_last_packet_received (std::chrono::steady_clock::now ());
temporary_channel->set_last_packet_sent (std::chrono::steady_clock::now ());
temporary_channel->server = true;
temporary_channel->temporary = true;
assert (type_a == nano::bootstrap_server_type::realtime || type_a == nano::bootstrap_server_type::realtime_response_server);
// Don't insert temporary channels for response_server
if (type_a == nano::bootstrap_server_type::realtime)
@ -603,7 +603,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id));
if (existing_channel)
{
process = existing_channel->server;
process = existing_channel->temporary;
}
}
if (process)

View file

@ -40,7 +40,9 @@ namespace transport
}
std::weak_ptr<nano::socket> socket;
std::weak_ptr<nano::bootstrap_server> response_server;
bool server{ false };
/* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server.
If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */
std::atomic<bool> temporary{ false };
nano::endpoint get_endpoint () const override
{
@ -84,7 +86,7 @@ namespace transport
size_t size () const;
std::shared_ptr<nano::transport::channel_tcp> find_channel (nano::tcp_endpoint const &) const;
void random_fill (std::array<nano::endpoint, 8> &) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0, bool = false) const;
bool store_all (bool = true);
std::shared_ptr<nano::transport::channel_tcp> find_node_id (nano::account const &);
// Get the next peer for attempting a tcp connection