Merge pull request #3443 from nanocurrency/cleanup_establish_tcp
Remove callbacks passed through start_tcp
This commit is contained in:
		
				commit
				
					
						3c65a53f56
					
				
			
		
					 8 changed files with 46 additions and 98 deletions
				
			
		| 
						 | 
				
			
			@ -112,7 +112,7 @@ TEST (network, send_node_id_handshake_tcp)
 | 
			
		|||
	auto initial_node1 (node1->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in));
 | 
			
		||||
	auto initial_keepalive (node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in));
 | 
			
		||||
	std::weak_ptr<nano::node> node_w (node0);
 | 
			
		||||
	node0->network.tcp_channels.start_tcp (node1->network.endpoint (), nano::keepalive_tcp_callback (*node1));
 | 
			
		||||
	node0->network.tcp_channels.start_tcp (node1->network.endpoint ());
 | 
			
		||||
	ASSERT_EQ (0, node0->network.size ());
 | 
			
		||||
	ASSERT_EQ (0, node1->network.size ());
 | 
			
		||||
	ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial + 2);
 | 
			
		||||
| 
						 | 
				
			
			@ -189,13 +189,13 @@ TEST (network, multi_keepalive)
 | 
			
		|||
	system.nodes.push_back (node1);
 | 
			
		||||
	ASSERT_EQ (0, node1->network.size ());
 | 
			
		||||
	ASSERT_EQ (0, node0->network.size ());
 | 
			
		||||
	node1->network.tcp_channels.start_tcp (node0->network.endpoint (), nano::keepalive_tcp_callback (*node1));
 | 
			
		||||
	node1->network.tcp_channels.start_tcp (node0->network.endpoint ());
 | 
			
		||||
	ASSERT_TIMELY (10s, node0->network.size () == 1 && node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive) >= 1);
 | 
			
		||||
	auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
 | 
			
		||||
	ASSERT_FALSE (node2->init_error ());
 | 
			
		||||
	node2->start ();
 | 
			
		||||
	system.nodes.push_back (node2);
 | 
			
		||||
	node2->network.tcp_channels.start_tcp (node0->network.endpoint (), nano::keepalive_tcp_callback (*node2));
 | 
			
		||||
	node2->network.tcp_channels.start_tcp (node0->network.endpoint ());
 | 
			
		||||
	ASSERT_TIMELY (10s, node1->network.size () == 2 && node0->network.size () == 2 && node2->network.size () == 2 && node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive) >= 2);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1215,12 +1215,7 @@ TEST (network, cleanup_purge)
 | 
			
		|||
	ASSERT_EQ (0, node1.network.size ());
 | 
			
		||||
 | 
			
		||||
	std::weak_ptr<nano::node> node_w = node1.shared ();
 | 
			
		||||
	node1.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w] (std::shared_ptr<nano::transport::channel> const & channel_a) {
 | 
			
		||||
		if (auto node_l = node_w.lock ())
 | 
			
		||||
		{
 | 
			
		||||
			node_l->network.send_keepalive (channel_a);
 | 
			
		||||
		}
 | 
			
		||||
	});
 | 
			
		||||
	node1.network.tcp_channels.start_tcp (node2->network.endpoint ());
 | 
			
		||||
 | 
			
		||||
	ASSERT_TIMELY (3s, node1.network.size () == 1);
 | 
			
		||||
	node1.network.cleanup (test_start);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -262,7 +262,7 @@ TEST (node, node_receive_quorum)
 | 
			
		|||
 | 
			
		||||
	system2.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
 | 
			
		||||
	ASSERT_TRUE (node1.balance (key.pub).is_zero ());
 | 
			
		||||
	node1.network.tcp_channels.start_tcp (system2.nodes[0]->network.endpoint (), nano::keepalive_tcp_callback (node1));
 | 
			
		||||
	node1.network.tcp_channels.start_tcp (system2.nodes[0]->network.endpoint ());
 | 
			
		||||
	while (node1.balance (key.pub).is_zero ())
 | 
			
		||||
	{
 | 
			
		||||
		ASSERT_NO_ERROR (system.poll ());
 | 
			
		||||
| 
						 | 
				
			
			@ -2270,16 +2270,14 @@ TEST (node, rep_remove)
 | 
			
		|||
	auto vote2 = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, nano::dev::genesis);
 | 
			
		||||
	node.rep_crawler.response (channel1, vote2);
 | 
			
		||||
	ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 1);
 | 
			
		||||
	// Add inactive TCP representative channel
 | 
			
		||||
	auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), nano::node_config (nano::get_available_port (), system.logging), system.work));
 | 
			
		||||
	node2->start ();
 | 
			
		||||
	std::weak_ptr<nano::node> node_w (node.shared ());
 | 
			
		||||
	auto vote3 = std::make_shared<nano::vote> (keypair2.pub, keypair2.prv, 0, nano::dev::genesis);
 | 
			
		||||
	node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &vote3] (std::shared_ptr<nano::transport::channel> const & channel2) {
 | 
			
		||||
		if (auto node_l = node_w.lock ())
 | 
			
		||||
		{
 | 
			
		||||
			ASSERT_FALSE (node_l->rep_crawler.response (channel2, vote3));
 | 
			
		||||
		}
 | 
			
		||||
	});
 | 
			
		||||
	node.network.tcp_channels.start_tcp (node2->network.endpoint ());
 | 
			
		||||
	std::shared_ptr<nano::transport::channel> channel2;
 | 
			
		||||
	ASSERT_TIMELY (10s, (channel2 = node.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node2->network.endpoint ()))) != nullptr);
 | 
			
		||||
	ASSERT_FALSE (node.rep_crawler.response (channel2, vote3));
 | 
			
		||||
	ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 2);
 | 
			
		||||
	node2->stop ();
 | 
			
		||||
	ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 1);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -575,12 +575,7 @@ void nano::network::merge_peer (nano::endpoint const & peer_a)
 | 
			
		|||
	if (!reachout (peer_a, node.config.allow_local_peers))
 | 
			
		||||
	{
 | 
			
		||||
		std::weak_ptr<nano::node> node_w (node.shared ());
 | 
			
		||||
		node.network.tcp_channels.start_tcp (peer_a, [node_w] (std::shared_ptr<nano::transport::channel> const & channel_a) {
 | 
			
		||||
			if (auto node_l = node_w.lock ())
 | 
			
		||||
			{
 | 
			
		||||
				node_l->network.send_keepalive (channel_a);
 | 
			
		||||
			}
 | 
			
		||||
		});
 | 
			
		||||
		node.network.tcp_channels.start_tcp (peer_a);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -45,12 +45,7 @@ void nano::node::keepalive (std::string const & address_a, uint16_t port_a)
 | 
			
		|||
				auto channel (node_l->network.find_channel (endpoint));
 | 
			
		||||
				if (!channel)
 | 
			
		||||
				{
 | 
			
		||||
					node_l->network.tcp_channels.start_tcp (endpoint, [node_w] (std::shared_ptr<nano::transport::channel> const & channel_a) {
 | 
			
		||||
						if (auto node_l = node_w.lock ())
 | 
			
		||||
						{
 | 
			
		||||
							node_l->network.send_keepalive (channel_a);
 | 
			
		||||
						}
 | 
			
		||||
					});
 | 
			
		||||
					node_l->network.tcp_channels.start_tcp (endpoint);
 | 
			
		||||
				}
 | 
			
		||||
				else
 | 
			
		||||
				{
 | 
			
		||||
| 
						 | 
				
			
			@ -1219,17 +1214,7 @@ void nano::node::add_initial_peers ()
 | 
			
		|||
		nano::endpoint endpoint (boost::asio::ip::address_v6 (i->first.address_bytes ()), i->first.port ());
 | 
			
		||||
		if (!network.reachout (endpoint, config.allow_local_peers))
 | 
			
		||||
		{
 | 
			
		||||
			std::weak_ptr<nano::node> node_w (shared_from_this ());
 | 
			
		||||
			network.tcp_channels.start_tcp (endpoint, [node_w] (std::shared_ptr<nano::transport::channel> const & channel_a) {
 | 
			
		||||
				if (auto node_l = node_w.lock ())
 | 
			
		||||
				{
 | 
			
		||||
					node_l->network.send_keepalive (channel_a);
 | 
			
		||||
					if (!node_l->flags.disable_rep_crawler)
 | 
			
		||||
					{
 | 
			
		||||
						node_l->rep_crawler.query (channel_a);
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			});
 | 
			
		||||
			network.tcp_channels.start_tcp (endpoint);
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -516,11 +516,11 @@ void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_
 | 
			
		|||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a, std::function<void (std::shared_ptr<nano::transport::channel> const &)> const & callback_a)
 | 
			
		||||
void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a)
 | 
			
		||||
{
 | 
			
		||||
	if (node.flags.disable_tcp_realtime)
 | 
			
		||||
	{
 | 
			
		||||
		node.network.tcp_channels.udp_fallback (endpoint_a, callback_a);
 | 
			
		||||
		node.network.tcp_channels.udp_fallback (endpoint_a);
 | 
			
		||||
		return;
 | 
			
		||||
	}
 | 
			
		||||
	auto socket = std::make_shared<nano::socket> (node, boost::none);
 | 
			
		||||
| 
						 | 
				
			
			@ -528,7 +528,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
 | 
			
		|||
	auto channel (std::make_shared<nano::transport::channel_tcp> (node, socket_w));
 | 
			
		||||
	std::weak_ptr<nano::node> node_w (node.shared ());
 | 
			
		||||
	socket->async_connect (nano::transport::map_endpoint_to_tcp (endpoint_a),
 | 
			
		||||
	[node_w, channel, socket, endpoint_a, callback_a] (boost::system::error_code const & ec) {
 | 
			
		||||
	[node_w, channel, socket, endpoint_a] (boost::system::error_code const & ec) {
 | 
			
		||||
		if (auto node_l = node_w.lock ())
 | 
			
		||||
		{
 | 
			
		||||
			if (!ec && channel)
 | 
			
		||||
| 
						 | 
				
			
			@ -543,12 +543,12 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
 | 
			
		|||
				channel->set_endpoint ();
 | 
			
		||||
				std::shared_ptr<std::vector<uint8_t>> receive_buffer (std::make_shared<std::vector<uint8_t>> ());
 | 
			
		||||
				receive_buffer->resize (256);
 | 
			
		||||
				channel->send (message, [node_w, channel, endpoint_a, receive_buffer, callback_a] (boost::system::error_code const & ec, size_t size_a) {
 | 
			
		||||
				channel->send (message, [node_w, channel, endpoint_a, receive_buffer] (boost::system::error_code const & ec, size_t size_a) {
 | 
			
		||||
					if (auto node_l = node_w.lock ())
 | 
			
		||||
					{
 | 
			
		||||
						if (!ec)
 | 
			
		||||
						{
 | 
			
		||||
							node_l->network.tcp_channels.start_tcp_receive_node_id (channel, endpoint_a, receive_buffer, callback_a);
 | 
			
		||||
							node_l->network.tcp_channels.start_tcp_receive_node_id (channel, endpoint_a, receive_buffer);
 | 
			
		||||
						}
 | 
			
		||||
						else
 | 
			
		||||
						{
 | 
			
		||||
| 
						 | 
				
			
			@ -560,25 +560,25 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
 | 
			
		|||
							{
 | 
			
		||||
								node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ()));
 | 
			
		||||
							}
 | 
			
		||||
							node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a);
 | 
			
		||||
							node_l->network.tcp_channels.udp_fallback (endpoint_a);
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				});
 | 
			
		||||
			}
 | 
			
		||||
			else
 | 
			
		||||
			{
 | 
			
		||||
				node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a);
 | 
			
		||||
				node_l->network.tcp_channels.udp_fallback (endpoint_a);
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	});
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const & channel_a, nano::endpoint const & endpoint_a, std::shared_ptr<std::vector<uint8_t>> const & receive_buffer_a, std::function<void (std::shared_ptr<nano::transport::channel> const &)> const & callback_a)
 | 
			
		||||
void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const & channel_a, nano::endpoint const & endpoint_a, std::shared_ptr<std::vector<uint8_t>> const & receive_buffer_a)
 | 
			
		||||
{
 | 
			
		||||
	std::weak_ptr<nano::node> node_w (node.shared ());
 | 
			
		||||
	if (auto socket_l = channel_a->socket.lock ())
 | 
			
		||||
	{
 | 
			
		||||
		auto cleanup_node_id_handshake_socket = [socket_w = channel_a->socket, node_w] (nano::endpoint const & endpoint_a, std::function<void (std::shared_ptr<nano::transport::channel>)> const & callback_a) {
 | 
			
		||||
		auto cleanup_node_id_handshake_socket = [socket_w = channel_a->socket, node_w] (nano::endpoint const & endpoint_a) {
 | 
			
		||||
			if (auto node_l = node_w.lock ())
 | 
			
		||||
			{
 | 
			
		||||
				if (auto socket_l = socket_w.lock ())
 | 
			
		||||
| 
						 | 
				
			
			@ -588,15 +588,15 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
 | 
			
		|||
			}
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		auto cleanup_and_udp_fallback = [socket_w = channel_a->socket, node_w, cleanup_node_id_handshake_socket] (nano::endpoint const & endpoint_a, std::function<void (std::shared_ptr<nano::transport::channel>)> const & callback_a) {
 | 
			
		||||
		auto cleanup_and_udp_fallback = [socket_w = channel_a->socket, node_w, cleanup_node_id_handshake_socket] (nano::endpoint const & endpoint_a) {
 | 
			
		||||
			if (auto node_l = node_w.lock ())
 | 
			
		||||
			{
 | 
			
		||||
				node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a);
 | 
			
		||||
				cleanup_node_id_handshake_socket (endpoint_a, callback_a);
 | 
			
		||||
				node_l->network.tcp_channels.udp_fallback (endpoint_a);
 | 
			
		||||
				cleanup_node_id_handshake_socket (endpoint_a);
 | 
			
		||||
			}
 | 
			
		||||
		};
 | 
			
		||||
 | 
			
		||||
		socket_l->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, callback_a, cleanup_and_udp_fallback, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, size_t size_a) {
 | 
			
		||||
		socket_l->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, cleanup_and_udp_fallback, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, size_t size_a) {
 | 
			
		||||
			if (auto node_l = node_w.lock ())
 | 
			
		||||
			{
 | 
			
		||||
				if (!ec && channel_a)
 | 
			
		||||
| 
						 | 
				
			
			@ -635,7 +635,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
 | 
			
		|||
									{
 | 
			
		||||
										node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ()));
 | 
			
		||||
									}
 | 
			
		||||
									channel_a->send (response_message, [node_w, channel_a, endpoint_a, callback_a, cleanup_and_udp_fallback] (boost::system::error_code const & ec, size_t size_a) {
 | 
			
		||||
									channel_a->send (response_message, [node_w, channel_a, endpoint_a, cleanup_and_udp_fallback] (boost::system::error_code const & ec, size_t size_a) {
 | 
			
		||||
										if (auto node_l = node_w.lock ())
 | 
			
		||||
										{
 | 
			
		||||
											if (!ec && channel_a)
 | 
			
		||||
| 
						 | 
				
			
			@ -646,10 +646,6 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
 | 
			
		|||
													channel_a->set_last_packet_sent (std::chrono::steady_clock::now ());
 | 
			
		||||
													auto response_server = std::make_shared<nano::bootstrap_server> (socket_l, node_l);
 | 
			
		||||
													node_l->network.tcp_channels.insert (channel_a, socket_l, response_server);
 | 
			
		||||
													if (callback_a)
 | 
			
		||||
													{
 | 
			
		||||
														callback_a (channel_a);
 | 
			
		||||
													}
 | 
			
		||||
													// Listen for possible responses
 | 
			
		||||
													response_server->socket->type_set (nano::socket::type_t::realtime_response_server);
 | 
			
		||||
													response_server->remote_node_id = channel_a->get_node_id ();
 | 
			
		||||
| 
						 | 
				
			
			@ -669,7 +665,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
 | 
			
		|||
												{
 | 
			
		||||
													node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ()));
 | 
			
		||||
												}
 | 
			
		||||
												cleanup_and_udp_fallback (endpoint_a, callback_a);
 | 
			
		||||
												cleanup_and_udp_fallback (endpoint_a);
 | 
			
		||||
											}
 | 
			
		||||
										}
 | 
			
		||||
									});
 | 
			
		||||
| 
						 | 
				
			
			@ -677,13 +673,13 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
 | 
			
		|||
							}
 | 
			
		||||
							else
 | 
			
		||||
							{
 | 
			
		||||
								cleanup_and_udp_fallback (endpoint_a, callback_a);
 | 
			
		||||
								cleanup_and_udp_fallback (endpoint_a);
 | 
			
		||||
							}
 | 
			
		||||
						}
 | 
			
		||||
						else
 | 
			
		||||
						{
 | 
			
		||||
							// Version of channel is not high enough, just abort. Don't fallback to udp, instead cleanup attempt
 | 
			
		||||
							cleanup_node_id_handshake_socket (endpoint_a, callback_a);
 | 
			
		||||
							cleanup_node_id_handshake_socket (endpoint_a);
 | 
			
		||||
							{
 | 
			
		||||
								nano::lock_guard<nano::mutex> lock (node_l->network.tcp_channels.mutex);
 | 
			
		||||
								node_l->network.tcp_channels.attempts.get<endpoint_tag> ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a));
 | 
			
		||||
| 
						 | 
				
			
			@ -692,7 +688,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
 | 
			
		|||
					}
 | 
			
		||||
					else
 | 
			
		||||
					{
 | 
			
		||||
						cleanup_and_udp_fallback (endpoint_a, callback_a);
 | 
			
		||||
						cleanup_and_udp_fallback (endpoint_a);
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
				else
 | 
			
		||||
| 
						 | 
				
			
			@ -701,22 +697,22 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
 | 
			
		|||
					{
 | 
			
		||||
						node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%: %2%") % endpoint_a % ec.message ()));
 | 
			
		||||
					}
 | 
			
		||||
					cleanup_and_udp_fallback (endpoint_a, callback_a);
 | 
			
		||||
					cleanup_and_udp_fallback (endpoint_a);
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		});
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a, std::function<void (std::shared_ptr<nano::transport::channel> const &)> const & callback_a)
 | 
			
		||||
void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a)
 | 
			
		||||
{
 | 
			
		||||
	{
 | 
			
		||||
		nano::lock_guard<nano::mutex> lock (mutex);
 | 
			
		||||
		attempts.get<endpoint_tag> ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a));
 | 
			
		||||
	}
 | 
			
		||||
	if (callback_a && !node.flags.disable_udp)
 | 
			
		||||
	if (!node.flags.disable_udp)
 | 
			
		||||
	{
 | 
			
		||||
		auto channel_udp (node.network.udp_channels.create (endpoint_a));
 | 
			
		||||
		callback_a (channel_udp);
 | 
			
		||||
		auto channel_udp = node.network.udp_channels.create (endpoint_a);
 | 
			
		||||
		node.network.send_keepalive (channel_udp);
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -102,9 +102,9 @@ namespace transport
 | 
			
		|||
		void modify (std::shared_ptr<nano::transport::channel_tcp> const &, std::function<void (std::shared_ptr<nano::transport::channel_tcp> const &)>);
 | 
			
		||||
		void update (nano::tcp_endpoint const &);
 | 
			
		||||
		// Connection start
 | 
			
		||||
		void start_tcp (nano::endpoint const &, std::function<void (std::shared_ptr<nano::transport::channel> const &)> const & = nullptr);
 | 
			
		||||
		void start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const &, nano::endpoint const &, std::shared_ptr<std::vector<uint8_t>> const &, std::function<void (std::shared_ptr<nano::transport::channel> const &)> const &);
 | 
			
		||||
		void udp_fallback (nano::endpoint const &, std::function<void (std::shared_ptr<nano::transport::channel> const &)> const &);
 | 
			
		||||
		void start_tcp (nano::endpoint const &);
 | 
			
		||||
		void start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const &, nano::endpoint const &, std::shared_ptr<std::vector<uint8_t>> const &);
 | 
			
		||||
		void udp_fallback (nano::endpoint const &);
 | 
			
		||||
		nano::node & node;
 | 
			
		||||
 | 
			
		||||
	private:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,36 +8,18 @@
 | 
			
		|||
#include <chrono>
 | 
			
		||||
#include <future>
 | 
			
		||||
 | 
			
		||||
using namespace std::chrono_literals;
 | 
			
		||||
 | 
			
		||||
std::shared_ptr<nano::transport::channel_tcp> nano::establish_tcp (nano::system & system, nano::node & node, nano::endpoint const & endpoint)
 | 
			
		||||
{
 | 
			
		||||
	using namespace std::chrono_literals;
 | 
			
		||||
	debug_assert (node.network.endpoint () != endpoint && "Establishing TCP to self is not allowed");
 | 
			
		||||
 | 
			
		||||
	std::shared_ptr<nano::transport::channel_tcp> result;
 | 
			
		||||
	debug_assert (!node.flags.disable_tcp_realtime);
 | 
			
		||||
	std::promise<std::shared_ptr<nano::transport::channel>> promise;
 | 
			
		||||
	auto callback = [&promise] (std::shared_ptr<nano::transport::channel> channel_a) { promise.set_value (channel_a); };
 | 
			
		||||
	auto future = promise.get_future ();
 | 
			
		||||
	node.network.tcp_channels.start_tcp (endpoint, callback);
 | 
			
		||||
	auto error = system.poll_until_true (2s, [&future] { return future.wait_for (0s) == std::future_status::ready; });
 | 
			
		||||
	if (!error)
 | 
			
		||||
	{
 | 
			
		||||
		auto channel = future.get ();
 | 
			
		||||
		EXPECT_NE (nullptr, channel);
 | 
			
		||||
		if (channel)
 | 
			
		||||
		{
 | 
			
		||||
			result = node.network.tcp_channels.find_channel (channel->get_tcp_endpoint ());
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	node.network.tcp_channels.start_tcp (endpoint);
 | 
			
		||||
	auto error = system.poll_until_true (2s, [&result, &node, &endpoint] {
 | 
			
		||||
		result = node.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (endpoint));
 | 
			
		||||
		return result != nullptr;
 | 
			
		||||
	});
 | 
			
		||||
	return result;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
std::function<void (std::shared_ptr<nano::transport::channel> channel_a)> nano::keepalive_tcp_callback (nano::node & node_a)
 | 
			
		||||
{
 | 
			
		||||
	return [node_w = std::weak_ptr<nano::node> (node_a.shared ())] (std::shared_ptr<nano::transport::channel> channel_a) {
 | 
			
		||||
		if (auto node_l = node_w.lock ())
 | 
			
		||||
		{
 | 
			
		||||
			node_l->network.send_keepalive (channel_a);
 | 
			
		||||
		};
 | 
			
		||||
	};
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,7 +15,4 @@ namespace transport
 | 
			
		|||
 | 
			
		||||
/** Waits until a TCP connection is established and returns the TCP channel on success*/
 | 
			
		||||
std::shared_ptr<nano::transport::channel_tcp> establish_tcp (nano::system &, nano::node &, nano::endpoint const &);
 | 
			
		||||
 | 
			
		||||
/** Returns a callback to be used for start_tcp to send a keepalive*/
 | 
			
		||||
std::function<void (std::shared_ptr<nano::transport::channel> channel_a)> keepalive_tcp_callback (nano::node &);
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue