Respond to keepalive messages within a TCP socket (#1742)
* Respond to keepalive messages within a TCP socket so we can determine version in both directions. * Fixing logging copy-pasta from frontier_req body. * Fixing double-counting incoming keepalives.
This commit is contained in:
		
					parent
					
						
							
								d3f3459339
							
						
					
				
			
			
				commit
				
					
						685ffc94ab
					
				
			
		
					 5 changed files with 93 additions and 3 deletions
				
			
		| 
						 | 
					@ -1392,3 +1392,31 @@ TEST (bulk_pull_account, basics)
 | 
				
			||||||
		ASSERT_EQ (nullptr, block_data.second.get ());
 | 
							ASSERT_EQ (nullptr, block_data.second.get ());
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					TEST (bootstrap, keepalive)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						nano::system system (24000, 1);
 | 
				
			||||||
 | 
						auto socket (std::make_shared<nano::socket> (system.nodes[0]));
 | 
				
			||||||
 | 
						nano::keepalive keepalive;
 | 
				
			||||||
 | 
						auto input (keepalive.to_bytes ());
 | 
				
			||||||
 | 
						socket->async_connect (system.nodes[0]->bootstrap.endpoint (), [&input, socket](boost::system::error_code const & ec) {
 | 
				
			||||||
 | 
							ASSERT_FALSE (ec);
 | 
				
			||||||
 | 
							socket->async_write (input, [&input](boost::system::error_code const & ec, size_t size_a) {
 | 
				
			||||||
 | 
								ASSERT_FALSE (ec);
 | 
				
			||||||
 | 
								ASSERT_EQ (input->size (), size_a);
 | 
				
			||||||
 | 
							});
 | 
				
			||||||
 | 
						});
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						auto output (keepalive.to_bytes ());
 | 
				
			||||||
 | 
						bool done (false);
 | 
				
			||||||
 | 
						socket->async_read (output, output->size (), [&output, &done](boost::system::error_code const & ec, size_t size_a) {
 | 
				
			||||||
 | 
							ASSERT_FALSE (ec);
 | 
				
			||||||
 | 
							ASSERT_EQ (output->size (), size_a);
 | 
				
			||||||
 | 
							done = true;
 | 
				
			||||||
 | 
						});
 | 
				
			||||||
 | 
						system.deadline_set (std::chrono::seconds (5));
 | 
				
			||||||
 | 
						while (!done)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							ASSERT_NO_ERROR (system.poll ());
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2062,7 +2062,7 @@ receive_buffer (std::make_shared<std::vector<uint8_t>> ()),
 | 
				
			||||||
socket (socket_a),
 | 
					socket (socket_a),
 | 
				
			||||||
node (node_a)
 | 
					node (node_a)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	receive_buffer->resize (128);
 | 
						receive_buffer->resize (512);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void nano::bootstrap_server::receive ()
 | 
					void nano::bootstrap_server::receive ()
 | 
				
			||||||
| 
						 | 
					@ -2119,6 +2119,14 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co
 | 
				
			||||||
					add_request (std::unique_ptr<nano::message> (new nano::bulk_push (header)));
 | 
										add_request (std::unique_ptr<nano::message> (new nano::bulk_push (header)));
 | 
				
			||||||
					break;
 | 
										break;
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
									case nano::message_type::keepalive:
 | 
				
			||||||
 | 
									{
 | 
				
			||||||
 | 
										auto this_l (shared_from_this ());
 | 
				
			||||||
 | 
										socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
 | 
				
			||||||
 | 
											this_l->receive_keepalive_action (ec, size_a, header);
 | 
				
			||||||
 | 
										});
 | 
				
			||||||
 | 
										break;
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
				default:
 | 
									default:
 | 
				
			||||||
				{
 | 
									{
 | 
				
			||||||
					if (node->config.logging.network_logging ())
 | 
										if (node->config.logging.network_logging ())
 | 
				
			||||||
| 
						 | 
					@ -2178,6 +2186,28 @@ void nano::bootstrap_server::receive_bulk_pull_account_action (boost::system::er
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					void nano::bootstrap_server::receive_keepalive_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
						if (!ec)
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							auto error (false);
 | 
				
			||||||
 | 
							nano::bufferstream stream (receive_buffer->data (), header_a.payload_length_bytes ());
 | 
				
			||||||
 | 
							std::unique_ptr<nano::keepalive> request (new nano::keepalive (error, stream, header_a));
 | 
				
			||||||
 | 
							if (!error)
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								add_request (std::unique_ptr<nano::message> (request.release ()));
 | 
				
			||||||
 | 
								receive ();
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						else
 | 
				
			||||||
 | 
						{
 | 
				
			||||||
 | 
							if (node->config.logging.network_keepalive_logging ())
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								BOOST_LOG (node->log) << boost::str (boost::format ("Error receiving keepalive from: %1%") % ec.message ());
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void nano::bootstrap_server::receive_frontier_req_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
 | 
					void nano::bootstrap_server::receive_frontier_req_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	if (!ec)
 | 
						if (!ec)
 | 
				
			||||||
| 
						 | 
					@ -2235,9 +2265,35 @@ public:
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	virtual ~request_response_visitor () = default;
 | 
						virtual ~request_response_visitor () = default;
 | 
				
			||||||
	void keepalive (nano::keepalive const &) override
 | 
						void keepalive (nano::keepalive const & message_a) override
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
		assert (false);
 | 
							if (connection->node->config.logging.network_keepalive_logging ())
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								BOOST_LOG (connection->node->log) << boost::str (boost::format ("Received keepalive message from %1%") % connection->socket->remote_endpoint ());
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in);
 | 
				
			||||||
 | 
							connection->node->network.merge_peers (message_a.peers);
 | 
				
			||||||
 | 
							nano::keepalive message;
 | 
				
			||||||
 | 
							connection->node->peers.random_fill (message.peers);
 | 
				
			||||||
 | 
							auto bytes = message.to_bytes ();
 | 
				
			||||||
 | 
							if (connection->node->config.logging.network_keepalive_logging ())
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								BOOST_LOG (connection->node->log) << boost::str (boost::format ("Keepalive req sent to %1%") % connection->socket->remote_endpoint ());
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							connection->socket->async_write (bytes, [connection = connection](boost::system::error_code const & ec, size_t size_a) {
 | 
				
			||||||
 | 
								if (ec)
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									if (connection->node->config.logging.network_keepalive_logging ())
 | 
				
			||||||
 | 
									{
 | 
				
			||||||
 | 
										BOOST_LOG (connection->node->log) << boost::str (boost::format ("Error sending keepalive to %1%: %2%") % connection->socket->remote_endpoint () % ec.message ());
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								else
 | 
				
			||||||
 | 
								{
 | 
				
			||||||
 | 
									connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::out);
 | 
				
			||||||
 | 
									connection->finish_request ();
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							});
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	void publish (nano::publish const &) override
 | 
						void publish (nano::publish const &) override
 | 
				
			||||||
	{
 | 
						{
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -284,6 +284,7 @@ public:
 | 
				
			||||||
	void receive_bulk_pull_action (boost::system::error_code const &, size_t, nano::message_header const &);
 | 
						void receive_bulk_pull_action (boost::system::error_code const &, size_t, nano::message_header const &);
 | 
				
			||||||
	void receive_bulk_pull_account_action (boost::system::error_code const &, size_t, nano::message_header const &);
 | 
						void receive_bulk_pull_account_action (boost::system::error_code const &, size_t, nano::message_header const &);
 | 
				
			||||||
	void receive_frontier_req_action (boost::system::error_code const &, size_t, nano::message_header const &);
 | 
						void receive_frontier_req_action (boost::system::error_code const &, size_t, nano::message_header const &);
 | 
				
			||||||
 | 
						void receive_keepalive_action (boost::system::error_code const &, size_t, nano::message_header const &);
 | 
				
			||||||
	void add_request (std::unique_ptr<nano::message>);
 | 
						void add_request (std::unique_ptr<nano::message>);
 | 
				
			||||||
	void finish_request ();
 | 
						void finish_request ();
 | 
				
			||||||
	void run_next ();
 | 
						void run_next ();
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -119,6 +119,10 @@ size_t nano::message_header::payload_length_bytes () const
 | 
				
			||||||
		{
 | 
							{
 | 
				
			||||||
			return nano::bulk_pull_account::size;
 | 
								return nano::bulk_pull_account::size;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							case nano::message_type::keepalive:
 | 
				
			||||||
 | 
							{
 | 
				
			||||||
 | 
								return nano::keepalive::size;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		// Add realtime network messages once they get framing support; currently the
 | 
							// Add realtime network messages once they get framing support; currently the
 | 
				
			||||||
		// realtime messages all fit in a datagram from which they're deserialized.
 | 
							// realtime messages all fit in a datagram from which they're deserialized.
 | 
				
			||||||
		default:
 | 
							default:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -280,6 +280,7 @@ public:
 | 
				
			||||||
	bool deserialize (nano::stream &);
 | 
						bool deserialize (nano::stream &);
 | 
				
			||||||
	bool operator== (nano::keepalive const &) const;
 | 
						bool operator== (nano::keepalive const &) const;
 | 
				
			||||||
	std::array<nano::endpoint, 8> peers;
 | 
						std::array<nano::endpoint, 8> peers;
 | 
				
			||||||
 | 
						static size_t constexpr size = 8 * (16 + 2);
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
class publish : public message
 | 
					class publish : public message
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue