Prevent immediate cleanup of new channels (#2957)

* Do not purge new channels on network cleanup

* Make node.rep_remove test more consistent and faster by forcing an earlier timeout for UDP channel

* Speed up network.replace_port which also relies on removing an old channel

* Release channel0 after modified.

* Disabling tcp_realtime to prevent channel promotion udp->tcp.

* New version of network.replace_port that avoids network cleanup, the port should be replaced on handshake instead

* Ensure the channel wrapper is always updated correctly with the node id

* Use more io threads in the test

Co-authored-by: clemahieu <clemahieu@gmail.com>
This commit is contained in:
Guilherme Lawless 2020-09-23 14:19:01 +01:00 committed by GitHub
commit ec163548e4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 36 deletions

View file

@ -824,33 +824,28 @@ TEST (network, replace_port)
node_flags.disable_udp = false;
node_flags.disable_ongoing_telemetry_requests = true;
node_flags.disable_initial_telemetry_requests = true;
auto node0 = system.add_node (node_flags);
nano::node_config node0_config (nano::get_available_port (), system.logging);
node0_config.io_threads = 8;
auto node0 = system.add_node (node0_config, node_flags);
ASSERT_EQ (0, node0->network.size ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
nano::node_config node1_config (nano::get_available_port (), system.logging);
node1_config.io_threads = 1;
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, node1_config, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);
{
auto channel (node0->network.udp_channels.insert (nano::endpoint (node1->network.endpoint ().address (), 23000), node1->network_params.protocol.protocol_version));
if (channel)
{
channel->set_node_id (node1->node_id.pub);
}
}
auto wrong_endpoint = nano::endpoint (node1->network.endpoint ().address (), nano::get_available_port ());
auto channel0 (node0->network.udp_channels.insert (wrong_endpoint, node1->network_params.protocol.protocol_version));
ASSERT_NE (nullptr, channel0);
node0->network.udp_channels.modify (channel0, [&node1](std::shared_ptr<nano::transport::channel> channel_a) {
channel_a->set_node_id (node1->node_id.pub);
});
auto peers_list (node0->network.list (std::numeric_limits<size_t>::max ()));
ASSERT_EQ (peers_list[0]->get_node_id (), node1->node_id.pub);
auto channel (std::make_shared<nano::transport::channel_udp> (node0->network.udp_channels, node1->network.endpoint (), node1->network_params.protocol.protocol_version));
node0->network.send_keepalive (channel);
ASSERT_TIMELY (5s, node0->network.udp_channels.channel (node1->network.endpoint ()));
ASSERT_TIMELY (5s, node0->network.udp_channels.size () <= 1);
auto channel1 (std::make_shared<nano::transport::channel_udp> (node0->network.udp_channels, node1->network.endpoint (), node1->network_params.protocol.protocol_version));
ASSERT_EQ (node0->network.udp_channels.size (), 1);
auto list1 (node0->network.list (1));
ASSERT_EQ (node1->network.endpoint (), list1[0]->get_endpoint ());
auto list2 (node1->network.list (1));
ASSERT_EQ (node0->network.endpoint (), list2[0]->get_endpoint ());
// Remove correct peer (same node ID)
node0->network.udp_channels.clean_node_id (nano::endpoint (node1->network.endpoint ().address (), 23000), node1->node_id.pub);
ASSERT_TIMELY (5s, node0->network.udp_channels.size () <= 1);
node1->stop ();
node0->network.send_keepalive (channel1);
// On handshake, the channel is replaced
ASSERT_TIMELY (5s, !node0->network.udp_channels.channel (wrong_endpoint) && node0->network.udp_channels.channel (node1->network.endpoint ()));
}
TEST (network, peer_max_tcp_attempts)
@ -1111,3 +1106,42 @@ TEST (network, tcp_message_manager)
}
}
}
TEST (network, cleanup_purge)
{
auto test_start = std::chrono::steady_clock::now ();
nano::system system (1);
auto & node1 (*system.nodes[0]);
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
node2->start ();
system.nodes.push_back (node2);
ASSERT_EQ (0, node1.network.size ());
node1.network.cleanup (test_start);
ASSERT_EQ (0, node1.network.size ());
node1.network.udp_channels.insert (node2->network.endpoint (), node1.network_params.protocol.protocol_version);
ASSERT_EQ (1, node1.network.size ());
node1.network.cleanup (test_start);
ASSERT_EQ (1, node1.network.size ());
node1.network.cleanup (std::chrono::steady_clock::now ());
ASSERT_EQ (0, node1.network.size ());
std::weak_ptr<nano::node> node_w = node1.shared ();
node1.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w](std::shared_ptr<nano::transport::channel> channel_a) {
if (auto node_l = node_w.lock ())
{
node_l->network.send_keepalive (channel_a);
}
});
ASSERT_TIMELY (3s, node1.network.size () == 1);
node1.network.cleanup (test_start);
ASSERT_EQ (1, node1.network.size ());
node1.network.cleanup (std::chrono::steady_clock::now ());
ASSERT_EQ (0, node1.network.size ());
}

View file

@ -2205,17 +2205,22 @@ TEST (node, rep_remove)
// Add inactive UDP representative channel
nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), nano::get_available_port ());
std::shared_ptr<nano::transport::channel> channel0 (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint0, node.network_params.protocol.protocol_version));
nano::amount amount100 (100);
node.network.udp_channels.insert (endpoint0, node.network_params.protocol.protocol_version);
auto channel_udp = node.network.udp_channels.insert (endpoint0, node.network_params.protocol.protocol_version);
auto vote1 = std::make_shared<nano::vote> (keypair1.pub, keypair1.prv, 0, genesis.open);
node.rep_crawler.response (channel0, vote1);
ASSERT_FALSE (node.rep_crawler.response (channel0, vote1));
ASSERT_TIMELY (5s, node.rep_crawler.representative_count () == 1);
auto reps (node.rep_crawler.representatives (1));
ASSERT_EQ (1, reps.size ());
ASSERT_EQ (node.minimum_principal_weight () * 2, reps[0].weight.number ());
ASSERT_EQ (keypair1.pub, reps[0].account);
ASSERT_EQ (*channel0, reps[0].channel_ref ());
// Modify last_packet_received so the channel is removed faster
std::chrono::steady_clock::time_point fake_timepoint{};
node.network.udp_channels.modify (channel_udp, [fake_timepoint](std::shared_ptr<nano::transport::channel_udp> channel_a) {
channel_a->set_last_packet_received (fake_timepoint);
});
// This UDP channel is not reachable and should timeout
ASSERT_EQ (1, node.rep_crawler.representative_count ());
ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 0);
// Add working representative
auto node1 = system.add_node (nano::node_config (nano::get_available_port (), system.logging));
@ -2227,20 +2232,16 @@ TEST (node, rep_remove)
ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 1);
// Add inactive TCP representative channel
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, nano::node_config (nano::get_available_port (), system.logging), system.work));
std::atomic<bool> done{ false };
std::weak_ptr<nano::node> node_w (node.shared ());
auto vote3 = std::make_shared<nano::vote> (keypair2.pub, keypair2.prv, 0, genesis.open);
node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &done, &vote3, &system](std::shared_ptr<nano::transport::channel> channel2) {
node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &vote3](std::shared_ptr<nano::transport::channel> channel2) {
if (auto node_l = node_w.lock ())
{
node_l->rep_crawler.response (channel2, vote3);
ASSERT_TIMELY (10s, node_l->rep_crawler.representative_count () == 2);
done = true;
ASSERT_FALSE (node_l->rep_crawler.response (channel2, vote3));
}
});
ASSERT_TIMELY (10s, done);
ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 2);
node2->stop ();
// Remove inactive representatives
ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 1);
reps = node.rep_crawler.representatives (1);
ASSERT_EQ (nano::dev_genesis_key.pub, reps[0].account);

View file

@ -311,8 +311,6 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
debug_assert (endpoint_a == temporary_channel->get_tcp_endpoint ());
temporary_channel->set_node_id (node_id_a);
temporary_channel->set_network_version (message_a.header.version_using);
temporary_channel->set_last_packet_received (std::chrono::steady_clock::now ());
temporary_channel->set_last_packet_sent (std::chrono::steady_clock::now ());
temporary_channel->temporary = true;
debug_assert (type_a == nano::bootstrap_server_type::realtime || type_a == nano::bootstrap_server_type::realtime_response_server);
// Don't insert temporary channels for response_server

View file

@ -123,8 +123,8 @@ namespace transport
private:
std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_packet_received{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_packet_sent{ std::chrono::steady_clock::time_point () };
std::chrono::steady_clock::time_point last_packet_received{ std::chrono::steady_clock::now () };
std::chrono::steady_clock::time_point last_packet_sent{ std::chrono::steady_clock::now () };
boost::optional<nano::account> node_id{ boost::none };
std::atomic<uint8_t> network_version{ 0 };

View file

@ -480,7 +480,6 @@ public:
{
node.network.udp_channels.modify (new_channel, [&message_a](std::shared_ptr<nano::transport::channel_udp> channel_a) {
channel_a->set_node_id (message_a.response->first);
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
});
}
}