diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index e3170a993..3f85788ad 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -824,33 +824,28 @@ TEST (network, replace_port) node_flags.disable_udp = false; node_flags.disable_ongoing_telemetry_requests = true; node_flags.disable_initial_telemetry_requests = true; - auto node0 = system.add_node (node_flags); + nano::node_config node0_config (nano::get_available_port (), system.logging); + node0_config.io_threads = 8; + auto node0 = system.add_node (node0_config, node_flags); ASSERT_EQ (0, node0->network.size ()); - auto node1 (std::make_shared (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags)); + nano::node_config node1_config (nano::get_available_port (), system.logging); + node1_config.io_threads = 1; + auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, node1_config, system.work, node_flags)); node1->start (); system.nodes.push_back (node1); - { - auto channel (node0->network.udp_channels.insert (nano::endpoint (node1->network.endpoint ().address (), 23000), node1->network_params.protocol.protocol_version)); - if (channel) - { - channel->set_node_id (node1->node_id.pub); - } - } + auto wrong_endpoint = nano::endpoint (node1->network.endpoint ().address (), nano::get_available_port ()); + auto channel0 (node0->network.udp_channels.insert (wrong_endpoint, node1->network_params.protocol.protocol_version)); + ASSERT_NE (nullptr, channel0); + node0->network.udp_channels.modify (channel0, [&node1](std::shared_ptr channel_a) { + channel_a->set_node_id (node1->node_id.pub); + }); auto peers_list (node0->network.list (std::numeric_limits::max ())); ASSERT_EQ (peers_list[0]->get_node_id (), node1->node_id.pub); - auto channel (std::make_shared (node0->network.udp_channels, node1->network.endpoint (), node1->network_params.protocol.protocol_version)); - node0->network.send_keepalive (channel); - ASSERT_TIMELY (5s, node0->network.udp_channels.channel (node1->network.endpoint ())); - ASSERT_TIMELY (5s, node0->network.udp_channels.size () <= 1); + auto channel1 (std::make_shared (node0->network.udp_channels, node1->network.endpoint (), node1->network_params.protocol.protocol_version)); ASSERT_EQ (node0->network.udp_channels.size (), 1); - auto list1 (node0->network.list (1)); - ASSERT_EQ (node1->network.endpoint (), list1[0]->get_endpoint ()); - auto list2 (node1->network.list (1)); - ASSERT_EQ (node0->network.endpoint (), list2[0]->get_endpoint ()); - // Remove correct peer (same node ID) - node0->network.udp_channels.clean_node_id (nano::endpoint (node1->network.endpoint ().address (), 23000), node1->node_id.pub); - ASSERT_TIMELY (5s, node0->network.udp_channels.size () <= 1); - node1->stop (); + node0->network.send_keepalive (channel1); + // On handshake, the channel is replaced + ASSERT_TIMELY (5s, !node0->network.udp_channels.channel (wrong_endpoint) && node0->network.udp_channels.channel (node1->network.endpoint ())); } TEST (network, peer_max_tcp_attempts) @@ -1111,3 +1106,42 @@ TEST (network, tcp_message_manager) } } } + +TEST (network, cleanup_purge) +{ + auto test_start = std::chrono::steady_clock::now (); + + nano::system system (1); + auto & node1 (*system.nodes[0]); + + auto node2 (std::make_shared (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work)); + node2->start (); + system.nodes.push_back (node2); + + ASSERT_EQ (0, node1.network.size ()); + node1.network.cleanup (test_start); + ASSERT_EQ (0, node1.network.size ()); + + node1.network.udp_channels.insert (node2->network.endpoint (), node1.network_params.protocol.protocol_version); + ASSERT_EQ (1, node1.network.size ()); + node1.network.cleanup (test_start); + ASSERT_EQ (1, node1.network.size ()); + + node1.network.cleanup (std::chrono::steady_clock::now ()); + ASSERT_EQ (0, node1.network.size ()); + + std::weak_ptr node_w = node1.shared (); + node1.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w](std::shared_ptr channel_a) { + if (auto node_l = node_w.lock ()) + { + node_l->network.send_keepalive (channel_a); + } + }); + + ASSERT_TIMELY (3s, node1.network.size () == 1); + node1.network.cleanup (test_start); + ASSERT_EQ (1, node1.network.size ()); + + node1.network.cleanup (std::chrono::steady_clock::now ()); + ASSERT_EQ (0, node1.network.size ()); +} diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 2ef6b31fe..77be7e83a 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2205,17 +2205,22 @@ TEST (node, rep_remove) // Add inactive UDP representative channel nano::endpoint endpoint0 (boost::asio::ip::address_v6::loopback (), nano::get_available_port ()); std::shared_ptr channel0 (std::make_shared (node.network.udp_channels, endpoint0, node.network_params.protocol.protocol_version)); - nano::amount amount100 (100); - node.network.udp_channels.insert (endpoint0, node.network_params.protocol.protocol_version); + auto channel_udp = node.network.udp_channels.insert (endpoint0, node.network_params.protocol.protocol_version); auto vote1 = std::make_shared (keypair1.pub, keypair1.prv, 0, genesis.open); - node.rep_crawler.response (channel0, vote1); + ASSERT_FALSE (node.rep_crawler.response (channel0, vote1)); ASSERT_TIMELY (5s, node.rep_crawler.representative_count () == 1); auto reps (node.rep_crawler.representatives (1)); ASSERT_EQ (1, reps.size ()); ASSERT_EQ (node.minimum_principal_weight () * 2, reps[0].weight.number ()); ASSERT_EQ (keypair1.pub, reps[0].account); ASSERT_EQ (*channel0, reps[0].channel_ref ()); + // Modify last_packet_received so the channel is removed faster + std::chrono::steady_clock::time_point fake_timepoint{}; + node.network.udp_channels.modify (channel_udp, [fake_timepoint](std::shared_ptr channel_a) { + channel_a->set_last_packet_received (fake_timepoint); + }); // This UDP channel is not reachable and should timeout + ASSERT_EQ (1, node.rep_crawler.representative_count ()); ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 0); // Add working representative auto node1 = system.add_node (nano::node_config (nano::get_available_port (), system.logging)); @@ -2227,20 +2232,16 @@ TEST (node, rep_remove) ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 1); // Add inactive TCP representative channel auto node2 (std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, nano::node_config (nano::get_available_port (), system.logging), system.work)); - std::atomic done{ false }; std::weak_ptr node_w (node.shared ()); auto vote3 = std::make_shared (keypair2.pub, keypair2.prv, 0, genesis.open); - node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &done, &vote3, &system](std::shared_ptr channel2) { + node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &vote3](std::shared_ptr channel2) { if (auto node_l = node_w.lock ()) { - node_l->rep_crawler.response (channel2, vote3); - ASSERT_TIMELY (10s, node_l->rep_crawler.representative_count () == 2); - done = true; + ASSERT_FALSE (node_l->rep_crawler.response (channel2, vote3)); } }); - ASSERT_TIMELY (10s, done); + ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 2); node2->stop (); - // Remove inactive representatives ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 1); reps = node.rep_crawler.representatives (1); ASSERT_EQ (nano::dev_genesis_key.pub, reps[0].account); diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index fe41f74da..9a0351e15 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -311,8 +311,6 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa debug_assert (endpoint_a == temporary_channel->get_tcp_endpoint ()); temporary_channel->set_node_id (node_id_a); temporary_channel->set_network_version (message_a.header.version_using); - temporary_channel->set_last_packet_received (std::chrono::steady_clock::now ()); - temporary_channel->set_last_packet_sent (std::chrono::steady_clock::now ()); temporary_channel->temporary = true; debug_assert (type_a == nano::bootstrap_server_type::realtime || type_a == nano::bootstrap_server_type::realtime_response_server); // Don't insert temporary channels for response_server diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 95e82754d..6011fd0c6 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -123,8 +123,8 @@ namespace transport private: std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () }; - std::chrono::steady_clock::time_point last_packet_received{ std::chrono::steady_clock::time_point () }; - std::chrono::steady_clock::time_point last_packet_sent{ std::chrono::steady_clock::time_point () }; + std::chrono::steady_clock::time_point last_packet_received{ std::chrono::steady_clock::now () }; + std::chrono::steady_clock::time_point last_packet_sent{ std::chrono::steady_clock::now () }; boost::optional node_id{ boost::none }; std::atomic network_version{ 0 }; diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index b5dc826c5..2c936cf04 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -480,7 +480,6 @@ public: { node.network.udp_channels.modify (new_channel, [&message_a](std::shared_ptr channel_a) { channel_a->set_node_id (message_a.response->first); - channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); }); } }