diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 5d6040f3..e5bfd2cd 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -112,7 +112,7 @@ TEST (network, send_node_id_handshake_tcp) auto initial_node1 (node1->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in)); auto initial_keepalive (node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in)); std::weak_ptr node_w (node0); - node0->network.tcp_channels.start_tcp (node1->network.endpoint (), nano::keepalive_tcp_callback (*node1)); + node0->network.tcp_channels.start_tcp (node1->network.endpoint ()); ASSERT_EQ (0, node0->network.size ()); ASSERT_EQ (0, node1->network.size ()); ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial + 2); @@ -189,13 +189,13 @@ TEST (network, multi_keepalive) system.nodes.push_back (node1); ASSERT_EQ (0, node1->network.size ()); ASSERT_EQ (0, node0->network.size ()); - node1->network.tcp_channels.start_tcp (node0->network.endpoint (), nano::keepalive_tcp_callback (*node1)); + node1->network.tcp_channels.start_tcp (node0->network.endpoint ()); ASSERT_TIMELY (10s, node0->network.size () == 1 && node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive) >= 1); auto node2 (std::make_shared (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work)); ASSERT_FALSE (node2->init_error ()); node2->start (); system.nodes.push_back (node2); - node2->network.tcp_channels.start_tcp (node0->network.endpoint (), nano::keepalive_tcp_callback (*node2)); + node2->network.tcp_channels.start_tcp (node0->network.endpoint ()); ASSERT_TIMELY (10s, node1->network.size () == 2 && node0->network.size () == 2 && node2->network.size () == 2 && node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive) >= 2); } @@ -1215,12 +1215,7 @@ TEST (network, cleanup_purge) ASSERT_EQ (0, node1.network.size ()); std::weak_ptr node_w = node1.shared (); - node1.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w] (std::shared_ptr const & channel_a) { - if (auto node_l = node_w.lock ()) - { - node_l->network.send_keepalive (channel_a); - } - }); + node1.network.tcp_channels.start_tcp (node2->network.endpoint ()); ASSERT_TIMELY (3s, node1.network.size () == 1); node1.network.cleanup (test_start); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index bdf36925..0180db06 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -262,7 +262,7 @@ TEST (node, node_receive_quorum) system2.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv); ASSERT_TRUE (node1.balance (key.pub).is_zero ()); - node1.network.tcp_channels.start_tcp (system2.nodes[0]->network.endpoint (), nano::keepalive_tcp_callback (node1)); + node1.network.tcp_channels.start_tcp (system2.nodes[0]->network.endpoint ()); while (node1.balance (key.pub).is_zero ()) { ASSERT_NO_ERROR (system.poll ()); @@ -2270,16 +2270,14 @@ TEST (node, rep_remove) auto vote2 = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, nano::dev::genesis); node.rep_crawler.response (channel1, vote2); ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 1); - // Add inactive TCP representative channel auto node2 (std::make_shared (system.io_ctx, nano::unique_path (), nano::node_config (nano::get_available_port (), system.logging), system.work)); + node2->start (); std::weak_ptr node_w (node.shared ()); auto vote3 = std::make_shared (keypair2.pub, keypair2.prv, 0, nano::dev::genesis); - node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &vote3] (std::shared_ptr const & channel2) { - if (auto node_l = node_w.lock ()) - { - ASSERT_FALSE (node_l->rep_crawler.response (channel2, vote3)); - } - }); + node.network.tcp_channels.start_tcp (node2->network.endpoint ()); + std::shared_ptr channel2; + ASSERT_TIMELY (10s, (channel2 = node.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node2->network.endpoint ()))) != nullptr); + ASSERT_FALSE (node.rep_crawler.response (channel2, vote3)); ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 2); node2->stop (); ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 1); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 12951314..a564c42b 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -575,12 +575,7 @@ void nano::network::merge_peer (nano::endpoint const & peer_a) if (!reachout (peer_a, node.config.allow_local_peers)) { std::weak_ptr node_w (node.shared ()); - node.network.tcp_channels.start_tcp (peer_a, [node_w] (std::shared_ptr const & channel_a) { - if (auto node_l = node_w.lock ()) - { - node_l->network.send_keepalive (channel_a); - } - }); + node.network.tcp_channels.start_tcp (peer_a); } } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index aeefd89e..c3767f46 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -45,12 +45,7 @@ void nano::node::keepalive (std::string const & address_a, uint16_t port_a) auto channel (node_l->network.find_channel (endpoint)); if (!channel) { - node_l->network.tcp_channels.start_tcp (endpoint, [node_w] (std::shared_ptr const & channel_a) { - if (auto node_l = node_w.lock ()) - { - node_l->network.send_keepalive (channel_a); - } - }); + node_l->network.tcp_channels.start_tcp (endpoint); } else { @@ -1219,17 +1214,7 @@ void nano::node::add_initial_peers () nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ()); if (!network.reachout (endpoint, config.allow_local_peers)) { - std::weak_ptr node_w (shared_from_this ()); - network.tcp_channels.start_tcp (endpoint, [node_w] (std::shared_ptr const & channel_a) { - if (auto node_l = node_w.lock ()) - { - node_l->network.send_keepalive (channel_a); - if (!node_l->flags.disable_rep_crawler) - { - node_l->rep_crawler.query (channel_a); - } - } - }); + network.tcp_channels.start_tcp (endpoint); } } } diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 378f432d..d6c628ad 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -516,11 +516,11 @@ void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_ } } -void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a, std::function const &)> const & callback_a) +void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a) { if (node.flags.disable_tcp_realtime) { - node.network.tcp_channels.udp_fallback (endpoint_a, callback_a); + node.network.tcp_channels.udp_fallback (endpoint_a); return; } auto socket = std::make_shared (node, boost::none); @@ -528,7 +528,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a auto channel (std::make_shared (node, socket_w)); std::weak_ptr node_w (node.shared ()); socket->async_connect (nano::transport::map_endpoint_to_tcp (endpoint_a), - [node_w, channel, socket, endpoint_a, callback_a] (boost::system::error_code const & ec) { + [node_w, channel, socket, endpoint_a] (boost::system::error_code const & ec) { if (auto node_l = node_w.lock ()) { if (!ec && channel) @@ -543,12 +543,12 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a channel->set_endpoint (); std::shared_ptr> receive_buffer (std::make_shared> ()); receive_buffer->resize (256); - channel->send (message, [node_w, channel, endpoint_a, receive_buffer, callback_a] (boost::system::error_code const & ec, size_t size_a) { + channel->send (message, [node_w, channel, endpoint_a, receive_buffer] (boost::system::error_code const & ec, size_t size_a) { if (auto node_l = node_w.lock ()) { if (!ec) { - node_l->network.tcp_channels.start_tcp_receive_node_id (channel, endpoint_a, receive_buffer, callback_a); + node_l->network.tcp_channels.start_tcp_receive_node_id (channel, endpoint_a, receive_buffer); } else { @@ -560,25 +560,25 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a { node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ())); } - node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + node_l->network.tcp_channels.udp_fallback (endpoint_a); } } }); } else { - node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); + node_l->network.tcp_channels.udp_fallback (endpoint_a); } } }); } -void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr const & channel_a, nano::endpoint const & endpoint_a, std::shared_ptr> const & receive_buffer_a, std::function const &)> const & callback_a) +void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr const & channel_a, nano::endpoint const & endpoint_a, std::shared_ptr> const & receive_buffer_a) { std::weak_ptr node_w (node.shared ()); if (auto socket_l = channel_a->socket.lock ()) { - auto cleanup_node_id_handshake_socket = [socket_w = channel_a->socket, node_w] (nano::endpoint const & endpoint_a, std::function)> const & callback_a) { + auto cleanup_node_id_handshake_socket = [socket_w = channel_a->socket, node_w] (nano::endpoint const & endpoint_a) { if (auto node_l = node_w.lock ()) { if (auto socket_l = socket_w.lock ()) @@ -588,15 +588,15 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrsocket, node_w, cleanup_node_id_handshake_socket] (nano::endpoint const & endpoint_a, std::function)> const & callback_a) { + auto cleanup_and_udp_fallback = [socket_w = channel_a->socket, node_w, cleanup_node_id_handshake_socket] (nano::endpoint const & endpoint_a) { if (auto node_l = node_w.lock ()) { - node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a); - cleanup_node_id_handshake_socket (endpoint_a, callback_a); + node_l->network.tcp_channels.udp_fallback (endpoint_a); + cleanup_node_id_handshake_socket (endpoint_a); } }; - socket_l->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, callback_a, cleanup_and_udp_fallback, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, size_t size_a) { + socket_l->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, cleanup_and_udp_fallback, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, size_t size_a) { if (auto node_l = node_w.lock ()) { if (!ec && channel_a) @@ -635,7 +635,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrlogger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ())); } - channel_a->send (response_message, [node_w, channel_a, endpoint_a, callback_a, cleanup_and_udp_fallback] (boost::system::error_code const & ec, size_t size_a) { + channel_a->send (response_message, [node_w, channel_a, endpoint_a, cleanup_and_udp_fallback] (boost::system::error_code const & ec, size_t size_a) { if (auto node_l = node_w.lock ()) { if (!ec && channel_a) @@ -646,10 +646,6 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrset_last_packet_sent (std::chrono::steady_clock::now ()); auto response_server = std::make_shared (socket_l, node_l); node_l->network.tcp_channels.insert (channel_a, socket_l, response_server); - if (callback_a) - { - callback_a (channel_a); - } // Listen for possible responses response_server->socket->type_set (nano::socket::type_t::realtime_response_server); response_server->remote_node_id = channel_a->get_node_id (); @@ -669,7 +665,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrlogger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ())); } - cleanup_and_udp_fallback (endpoint_a, callback_a); + cleanup_and_udp_fallback (endpoint_a); } } }); @@ -677,13 +673,13 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr lock (node_l->network.tcp_channels.mutex); node_l->network.tcp_channels.attempts.get ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a)); @@ -692,7 +688,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrlogger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%: %2%") % endpoint_a % ec.message ())); } - cleanup_and_udp_fallback (endpoint_a, callback_a); + cleanup_and_udp_fallback (endpoint_a); } } }); } } -void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a, std::function const &)> const & callback_a) +void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a) { { nano::lock_guard lock (mutex); attempts.get ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a)); } - if (callback_a && !node.flags.disable_udp) + if (!node.flags.disable_udp) { - auto channel_udp (node.network.udp_channels.create (endpoint_a)); - callback_a (channel_udp); + auto channel_udp = node.network.udp_channels.create (endpoint_a); + node.network.send_keepalive (channel_udp); } } diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index f33bd922..9e7fe011 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -102,9 +102,9 @@ namespace transport void modify (std::shared_ptr const &, std::function const &)>); void update (nano::tcp_endpoint const &); // Connection start - void start_tcp (nano::endpoint const &, std::function const &)> const & = nullptr); - void start_tcp_receive_node_id (std::shared_ptr const &, nano::endpoint const &, std::shared_ptr> const &, std::function const &)> const &); - void udp_fallback (nano::endpoint const &, std::function const &)> const &); + void start_tcp (nano::endpoint const &); + void start_tcp_receive_node_id (std::shared_ptr const &, nano::endpoint const &, std::shared_ptr> const &); + void udp_fallback (nano::endpoint const &); nano::node & node; private: diff --git a/nano/test_common/network.cpp b/nano/test_common/network.cpp index 316218a9..1ef0faed 100644 --- a/nano/test_common/network.cpp +++ b/nano/test_common/network.cpp @@ -8,36 +8,18 @@ #include #include +using namespace std::chrono_literals; + std::shared_ptr nano::establish_tcp (nano::system & system, nano::node & node, nano::endpoint const & endpoint) { - using namespace std::chrono_literals; debug_assert (node.network.endpoint () != endpoint && "Establishing TCP to self is not allowed"); std::shared_ptr result; debug_assert (!node.flags.disable_tcp_realtime); - std::promise> promise; - auto callback = [&promise] (std::shared_ptr channel_a) { promise.set_value (channel_a); }; - auto future = promise.get_future (); - node.network.tcp_channels.start_tcp (endpoint, callback); - auto error = system.poll_until_true (2s, [&future] { return future.wait_for (0s) == std::future_status::ready; }); - if (!error) - { - auto channel = future.get (); - EXPECT_NE (nullptr, channel); - if (channel) - { - result = node.network.tcp_channels.find_channel (channel->get_tcp_endpoint ()); - } - } + node.network.tcp_channels.start_tcp (endpoint); + auto error = system.poll_until_true (2s, [&result, &node, &endpoint] { + result = node.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (endpoint)); + return result != nullptr; + }); return result; } - -std::function channel_a)> nano::keepalive_tcp_callback (nano::node & node_a) -{ - return [node_w = std::weak_ptr (node_a.shared ())] (std::shared_ptr channel_a) { - if (auto node_l = node_w.lock ()) - { - node_l->network.send_keepalive (channel_a); - }; - }; -} diff --git a/nano/test_common/network.hpp b/nano/test_common/network.hpp index 316330a2..4eeea69a 100644 --- a/nano/test_common/network.hpp +++ b/nano/test_common/network.hpp @@ -15,7 +15,4 @@ namespace transport /** Waits until a TCP connection is established and returns the TCP channel on success*/ std::shared_ptr establish_tcp (nano::system &, nano::node &, nano::endpoint const &); - -/** Returns a callback to be used for start_tcp to send a keepalive*/ -std::function channel_a)> keepalive_tcp_callback (nano::node &); }