Rework handshake visitor (#4504)

* Rework tcp_server handshake visitor

* Simplify message visitor

* Fixes
This commit is contained in:
Piotr Wójcik 2024-03-20 10:39:17 +01:00 committed by GitHub
commit 5e7e7fc02e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 194 additions and 104 deletions

View file

@ -722,7 +722,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
ASSERT_FALSE (ec);
});
});
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake) != 0);
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake) != 0);
{
nano::lock_guard<nano::mutex> guard (node0->tcp_listener->mutex);
ASSERT_EQ (node0->tcp_listener->connections.size (), 1);

View file

@ -235,6 +235,15 @@ enum class detail : uint8_t
tcp_read_error,
tcp_write_error,
// tcp_server
handshake,
handshake_abort,
handshake_error,
handshake_network_error,
handshake_initiate,
handshake_response,
handshake_response_invalid,
// ipc
invocations,

View file

@ -301,7 +301,7 @@ nano::transport::tcp_server::~tcp_server ()
return;
}
node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Exiting TCP server ({})", fmt::streamed (remote_endpoint));
if (socket->type () == nano::transport::socket::type_t::bootstrap)
{
@ -341,7 +341,7 @@ void nano::transport::tcp_server::start ()
return;
}
node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Starting TCP server ({})", fmt::streamed (remote_endpoint));
receive_message ();
}
@ -374,7 +374,7 @@ void nano::transport::tcp_server::receive_message ()
node->logger.debug (nano::log::type::tcp_server, "Error reading message: {}, status: {} ({})",
ec.message (),
to_string (this_l->message_deserializer->status),
nano::util::to_str (this_l->remote_endpoint));
fmt::streamed (this_l->remote_endpoint));
this_l->stop ();
}
@ -392,10 +392,11 @@ void nano::transport::tcp_server::received_message (std::unique_ptr<nano::messag
{
return;
}
bool should_continue = true;
process_result result = process_result::progress;
if (message)
{
should_continue = process_message (std::move (message));
result = process_message (std::move (message));
}
else
{
@ -403,33 +404,49 @@ void nano::transport::tcp_server::received_message (std::unique_ptr<nano::messag
debug_assert (message_deserializer->status != transport::parse_status::success);
node->stats.inc (nano::stat::type::error, to_stat_detail (message_deserializer->status));
// Avoid too much noise about `duplicate_publish_message` errors
if (message_deserializer->status == transport::parse_status::duplicate_publish_message)
{
node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message);
}
else
{
// Avoid too much noise about `duplicate_publish_message` errors
node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})",
to_string (message_deserializer->status),
nano::util::to_str (remote_endpoint));
fmt::streamed (remote_endpoint));
}
}
if (should_continue)
switch (result)
{
receive_message ();
case process_result::progress:
{
receive_message ();
}
break;
case process_result::abort:
{
stop ();
}
break;
case process_result::pause:
{
// Do nothing
}
break;
}
}
bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message> message)
auto nano::transport::tcp_server::process_message (std::unique_ptr<nano::message> message) -> process_result
{
auto node = this->node.lock ();
if (!node)
{
return false;
return process_result::abort;
}
node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->header.type), nano::stat::dir::in);
node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->type ()), nano::stat::dir::in);
debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ());
@ -446,50 +463,68 @@ bool nano::transport::tcp_server::process_message (std::unique_ptr<nano::message
*/
if (is_undefined_connection ())
{
handshake_message_visitor handshake_visitor{ shared_from_this () };
handshake_message_visitor handshake_visitor{ *this };
message->visit (handshake_visitor);
if (handshake_visitor.process)
{
queue_realtime (std::move (message));
return true;
}
else if (handshake_visitor.bootstrap)
{
if (!to_bootstrap_connection ())
{
stop ();
return false;
}
}
else
{
// Neither handshake nor bootstrap received when in handshake mode
node->logger.debug (nano::log::type::tcp_server, "Neither handshake nor bootstrap received when in handshake mode: {} ({})",
nano::to_string (message->header.type),
nano::util::to_str (remote_endpoint));
return true;
switch (handshake_visitor.result)
{
case handshake_status::abort:
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort);
node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint));
return process_result::abort;
}
case handshake_status::handshake:
{
return process_result::progress; // Continue handshake
}
case handshake_status::realtime:
{
queue_realtime (std::move (message));
return process_result::progress; // Continue receiving new messages
}
case handshake_status::bootstrap:
{
bool success = to_bootstrap_connection ();
if (!success)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), fmt::streamed (remote_endpoint));
return process_result::abort; // Switch failed, abort
}
else
{
// Fall through to process the bootstrap message
}
}
}
}
else if (is_realtime_connection ())
{
realtime_message_visitor realtime_visitor{ *this };
message->visit (realtime_visitor);
if (realtime_visitor.process)
{
queue_realtime (std::move (message));
}
return true;
return process_result::progress;
}
// the server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if`
// The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if`
if (is_bootstrap_connection ())
{
bootstrap_message_visitor bootstrap_visitor{ shared_from_this () };
message->visit (bootstrap_visitor);
return !bootstrap_visitor.processed; // Stop receiving new messages if bootstrap serving started
// Pause receiving new messages if bootstrap serving started
return bootstrap_visitor.processed ? process_result::pause : process_result::progress;
}
debug_assert (false);
return true; // Continue receiving new messages
return process_result::abort;
}
void nano::transport::tcp_server::queue_realtime (std::unique_ptr<nano::message> message)
@ -502,63 +537,74 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr<nano::message>
node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket });
}
/*
* Handshake
*/
nano::transport::tcp_server::handshake_message_visitor::handshake_message_visitor (std::shared_ptr<tcp_server> server) :
server{ std::move (server) }
auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status
{
}
void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (nano::node_id_handshake const & message)
{
auto node = server->node.lock ();
auto node = this->node.lock ();
if (!node)
{
return;
return handshake_status::abort;
}
if (node->flags.disable_tcp_realtime)
{
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", nano::util::to_str (server->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime TCP ({})", fmt::streamed (remote_endpoint));
// Stop invalid handshake
server->stop ();
return;
return handshake_status::abort;
}
if (message.query && server->handshake_query_received)
if (!message.query && !message.response)
{
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", nano::util::to_str (server->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", fmt::streamed (remote_endpoint));
// Stop invalid handshake
server->stop ();
return;
return handshake_status::abort;
}
if (message.query && handshake_received) // Second handshake message should be a response only
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", fmt::streamed (remote_endpoint));
return handshake_status::abort;
}
server->handshake_query_received = true;
handshake_received = true;
node->logger.debug (nano::log::type::tcp_server, "Handshake query received ({})", nano::util::to_str (server->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
node->logger.debug (nano::log::type::tcp_server, "Handshake message received ({})", fmt::streamed (remote_endpoint));
if (message.query)
{
server->send_handshake_response (*message.query, message.is_v2 ());
// Sends response + our own query
send_handshake_response (*message.query, message.is_v2 ());
// Fall through and continue handshake
}
if (message.response)
{
if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (server->remote_endpoint)))
if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (remote_endpoint)))
{
server->to_realtime_connection (message.response->node_id);
bool success = to_realtime_connection (message.response->node_id);
if (success)
{
return handshake_status::realtime; // Switched to realtime
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", fmt::streamed (remote_endpoint));
return handshake_status::abort;
}
}
else
{
// Stop invalid handshake
server->stop ();
return;
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", fmt::streamed (remote_endpoint));
return handshake_status::abort;
}
}
process = true;
return handshake_status::handshake; // Handshake is in progress
}
void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2)
@ -568,11 +614,13 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
{
return;
}
auto response = node->network.prepare_handshake_response (query, v2);
auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response };
// TODO: Use channel
node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", fmt::streamed (remote_endpoint));
auto shared_const_buffer = handshake_response.to_shared_const_buffer ();
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
auto node = this_l->node.lock ();
@ -582,47 +630,53 @@ void nano::transport::tcp_server::send_handshake_response (nano::node_id_handsha
}
if (ec)
{
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), nano::util::to_str (this_l->remote_endpoint));
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error);
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), fmt::streamed (this_l->remote_endpoint));
// Stop invalid handshake
this_l->stop ();
}
else
{
node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out);
}
});
}
/*
* handshake_message_visitor
*/
void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (const nano::node_id_handshake & message)
{
result = server.process_handshake (message);
}
void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message)
{
bootstrap = true;
result = handshake_status::bootstrap;
}
void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message)
{
bootstrap = true;
result = handshake_status::bootstrap;
}
void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message)
{
bootstrap = true;
result = handshake_status::bootstrap;
}
void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message)
{
bootstrap = true;
result = handshake_status::bootstrap;
}
/*
* Realtime
* realtime_message_visitor
*/
nano::transport::tcp_server::realtime_message_visitor::realtime_message_visitor (nano::transport::tcp_server & server_a) :
server{ server_a }
{
}
void nano::transport::tcp_server::realtime_message_visitor::keepalive (const nano::keepalive & message)
{
process = true;
@ -684,7 +738,7 @@ void nano::transport::tcp_server::realtime_message_visitor::asc_pull_ack (const
}
/*
* Bootstrap
* bootstrap_message_visitor
*/
nano::transport::tcp_server::bootstrap_message_visitor::bootstrap_message_visitor (std::shared_ptr<tcp_server> server) :
@ -769,6 +823,10 @@ void nano::transport::tcp_server::bootstrap_message_visitor::frontier_req (const
processed = true;
}
/*
*
*/
// TODO: We could periodically call this (from a dedicated timeout thread for eg.) but socket already handles timeouts,
// and since we only ever store tcp_server as weak_ptr, socket timeout will automatically trigger tcp_server cleanup
void nano::transport::tcp_server::timeout ()
@ -780,7 +838,7 @@ void nano::transport::tcp_server::timeout ()
}
if (socket->has_timed_out ())
{
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", fmt::streamed (remote_endpoint));
{
nano::lock_guard<nano::mutex> lock{ node->tcp_listener->mutex };
@ -834,7 +892,7 @@ bool nano::transport::tcp_server::to_bootstrap_connection ()
++node->tcp_listener->bootstrap_count;
socket->type_set (nano::transport::socket::type_t::bootstrap);
node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", nano::util::to_str (remote_endpoint));
node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", fmt::streamed (remote_endpoint));
return true;
}
@ -846,17 +904,22 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const &
{
return false;
}
if (socket->type () == nano::transport::socket::type_t::undefined && !node->flags.disable_tcp_realtime)
if (node->flags.disable_tcp_realtime)
{
remote_node_id = node_id;
++node->tcp_listener->realtime_count;
socket->type_set (nano::transport::socket::type_t::realtime);
node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", nano::util::to_str (remote_endpoint));
return true;
return false;
}
return false;
if (socket->type () != nano::transport::socket::type_t::undefined)
{
return false;
}
remote_node_id = node_id;
++node->tcp_listener->realtime_count;
socket->type_set (nano::transport::socket::type_t::realtime);
node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", fmt::streamed (remote_endpoint));
return true;
}
bool nano::transport::tcp_server::is_undefined_connection () const

View file

@ -69,19 +69,23 @@ public:
std::weak_ptr<nano::node> const node;
nano::mutex mutex;
std::atomic<bool> stopped{ false };
std::atomic<bool> handshake_query_received{ false };
std::atomic<bool> handshake_received{ false };
// Remote enpoint used to remove response channel even after socket closing
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{};
std::chrono::steady_clock::time_point last_telemetry_req{};
private:
void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2);
enum class process_result
{
abort,
progress,
pause,
};
void receive_message ();
void received_message (std::unique_ptr<nano::message> message);
bool process_message (std::unique_ptr<nano::message> message);
process_result process_message (std::unique_ptr<nano::message> message);
void queue_realtime (std::unique_ptr<nano::message> message);
bool to_bootstrap_connection ();
@ -90,19 +94,30 @@ private:
bool is_bootstrap_connection () const;
bool is_realtime_connection () const;
enum class handshake_status
{
abort,
handshake,
realtime,
bootstrap,
};
handshake_status process_handshake (nano::node_id_handshake const & message);
void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2);
private:
bool const allow_bootstrap;
std::shared_ptr<nano::transport::message_deserializer> message_deserializer;
std::optional<nano::keepalive> last_keepalive;
private:
private: // Visitors
class handshake_message_visitor : public nano::message_visitor
{
public:
bool process{ false };
bool bootstrap{ false };
handshake_status result{ handshake_status::abort };
explicit handshake_message_visitor (std::shared_ptr<tcp_server>);
explicit handshake_message_visitor (tcp_server & server) :
server{ server } {};
void node_id_handshake (nano::node_id_handshake const &) override;
void bulk_pull (nano::bulk_pull const &) override;
@ -111,7 +126,7 @@ private:
void frontier_req (nano::frontier_req const &) override;
private:
std::shared_ptr<tcp_server> server;
tcp_server & server;
};
class realtime_message_visitor : public nano::message_visitor
@ -119,7 +134,8 @@ private:
public:
bool process{ false };
explicit realtime_message_visitor (tcp_server &);
explicit realtime_message_visitor (tcp_server & server) :
server{ server } {};
void keepalive (nano::keepalive const &) override;
void publish (nano::publish const &) override;
@ -150,5 +166,7 @@ private:
private:
std::shared_ptr<tcp_server> server;
};
friend class handshake_message_visitor;
};
}