Merge pull request #3951 from thsfs/simplify-by-guard-clauses-start-tcp-receive-node-id
Simplify start_tcp_receive_node_id() by adding guard clauses
This commit is contained in:
commit
84348f70e4
1 changed files with 123 additions and 127 deletions
|
@ -595,141 +595,137 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
|
|||
void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const & channel_a, nano::endpoint const & endpoint_a, std::shared_ptr<std::vector<uint8_t>> const & receive_buffer_a)
|
||||
{
|
||||
std::weak_ptr<nano::node> node_w (node.shared ());
|
||||
if (auto socket_l = channel_a->socket.lock ())
|
||||
auto socket_l = channel_a->socket.lock ();
|
||||
if (!socket_l)
|
||||
{
|
||||
auto cleanup_node_id_handshake_socket = [socket_w = channel_a->socket, node_w] (nano::endpoint const & endpoint_a) {
|
||||
if (auto node_l = node_w.lock ())
|
||||
return;
|
||||
}
|
||||
auto cleanup_node_id_handshake_socket = [socket_w = channel_a->socket, node_w] (nano::endpoint const & endpoint_a) {
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
if (auto socket_l = socket_w.lock ())
|
||||
{
|
||||
if (auto socket_l = socket_w.lock ())
|
||||
{
|
||||
socket_l->close ();
|
||||
}
|
||||
socket_l->close ();
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
socket_l->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
if (auto node_l = node_w.lock ())
|
||||
socket_l->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
auto node_l = node_w.lock ();
|
||||
if (!node_l)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (ec || !channel_a)
|
||||
{
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
if (!ec && channel_a)
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
|
||||
auto error (false);
|
||||
nano::bufferstream stream (receive_buffer_a->data (), size_a);
|
||||
nano::message_header header (error, stream);
|
||||
// the header type should in principle be checked after checking the network bytes and the version numbers, I will not change it here since the benefits do not outweight the difficulties
|
||||
if (!error && header.type == nano::message_type::node_id_handshake)
|
||||
{
|
||||
if (header.network == node_l->network_params.network.current_network && header.version_using >= node_l->network_params.network.protocol_version_min)
|
||||
{
|
||||
nano::node_id_handshake message (error, stream, header);
|
||||
if (!error && message.response && message.query)
|
||||
{
|
||||
channel_a->set_network_version (header.version_using);
|
||||
auto node_id (message.response->first);
|
||||
bool process (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub);
|
||||
if (process)
|
||||
{
|
||||
/* If node ID is known, don't establish new connection
|
||||
Exception: temporary channels from bootstrap_server */
|
||||
auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id));
|
||||
if (existing_channel)
|
||||
{
|
||||
process = existing_channel->temporary;
|
||||
}
|
||||
}
|
||||
if (process)
|
||||
{
|
||||
channel_a->set_node_id (node_id);
|
||||
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
|
||||
boost::optional<std::pair<nano::account, nano::signature>> response (std::make_pair (node_l->node_id.pub, nano::sign_message (node_l->node_id.prv, node_l->node_id.pub, *message.query)));
|
||||
nano::node_id_handshake response_message (node_l->network_params.network, boost::none, response);
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ()));
|
||||
}
|
||||
channel_a->send (response_message, [node_w, channel_a, endpoint_a, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
if (auto node_l = node_w.lock ())
|
||||
{
|
||||
if (!ec && channel_a)
|
||||
{
|
||||
// Insert new node ID connection
|
||||
if (auto socket_l = channel_a->socket.lock ())
|
||||
{
|
||||
channel_a->set_last_packet_sent (std::chrono::steady_clock::now ());
|
||||
auto response_server = std::make_shared<nano::bootstrap_server> (socket_l, node_l);
|
||||
node_l->network.tcp_channels.insert (channel_a, socket_l, response_server);
|
||||
// Listen for possible responses
|
||||
response_server->socket->type_set (nano::socket::type_t::realtime_response_server);
|
||||
response_server->remote_node_id = channel_a->get_node_id ();
|
||||
response_server->start ();
|
||||
node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%: %2%") % endpoint_a % ec.message ()));
|
||||
}
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
|
||||
auto error (false);
|
||||
nano::bufferstream stream (receive_buffer_a->data (), size_a);
|
||||
nano::message_header header (error, stream);
|
||||
// the header type should in principle be checked after checking the network bytes and the version numbers, I will not change it here since the benefits do not outweight the difficulties
|
||||
if (error || header.type != nano::message_type::node_id_handshake)
|
||||
{
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake message header from %1%") % endpoint_a));
|
||||
}
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
if (header.network != node_l->network_params.network.current_network || header.version_using < node_l->network_params.network.protocol_version_min)
|
||||
{
|
||||
// error handling, either the networks bytes or the version is wrong
|
||||
if (header.network == node_l->network_params.network.current_network)
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::invalid_network);
|
||||
}
|
||||
else
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::outdated_version);
|
||||
}
|
||||
|
||||
if (!node_l->flags.disable_initial_telemetry_requests)
|
||||
{
|
||||
node_l->telemetry->get_metrics_single_peer_async (channel_a, [] (nano::telemetry_data_response const &) {
|
||||
// Intentionally empty, starts the telemetry request cycle to more quickly disconnect from invalid peers
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ()));
|
||||
}
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%") % endpoint_a));
|
||||
}
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// error handling, either the networks bytes or the version is wrong
|
||||
if (header.network == node_l->network_params.network.current_network)
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::invalid_network);
|
||||
}
|
||||
else
|
||||
{
|
||||
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::outdated_version);
|
||||
}
|
||||
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
// Cleanup attempt
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock (node_l->network.tcp_channels.mutex);
|
||||
node_l->network.tcp_channels.attempts.get<endpoint_tag> ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a));
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake message header from %1%") % endpoint_a));
|
||||
}
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
}
|
||||
}
|
||||
else
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
// Cleanup attempt
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock (node_l->network.tcp_channels.mutex);
|
||||
node_l->network.tcp_channels.attempts.get<endpoint_tag> ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a));
|
||||
}
|
||||
return;
|
||||
}
|
||||
nano::node_id_handshake message (error, stream, header);
|
||||
if (error || !message.response || !message.query)
|
||||
{
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%") % endpoint_a));
|
||||
}
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
channel_a->set_network_version (header.version_using);
|
||||
auto node_id (message.response->first);
|
||||
bool process (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub);
|
||||
if (!process)
|
||||
{
|
||||
return;
|
||||
}
|
||||
/* If node ID is known, don't establish new connection
|
||||
Exception: temporary channels from bootstrap_server */
|
||||
auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id));
|
||||
if (existing_channel && !existing_channel->temporary)
|
||||
{
|
||||
return;
|
||||
}
|
||||
channel_a->set_node_id (node_id);
|
||||
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
|
||||
boost::optional<std::pair<nano::account, nano::signature>> response (std::make_pair (node_l->node_id.pub, nano::sign_message (node_l->node_id.prv, node_l->node_id.pub, *message.query)));
|
||||
nano::node_id_handshake response_message (node_l->network_params.network, boost::none, response);
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ()));
|
||||
}
|
||||
channel_a->send (response_message, [node_w, channel_a, endpoint_a, cleanup_node_id_handshake_socket] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
auto node_l = node_w.lock ();
|
||||
if (!node_l)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (ec || !channel_a)
|
||||
{
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
if (node_l->config.logging.network_node_id_handshake_logging ())
|
||||
{
|
||||
node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%: %2%") % endpoint_a % ec.message ()));
|
||||
}
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ()));
|
||||
}
|
||||
cleanup_node_id_handshake_socket (endpoint_a);
|
||||
return;
|
||||
}
|
||||
// Insert new node ID connection
|
||||
auto socket_l = channel_a->socket.lock ();
|
||||
if (!socket_l)
|
||||
{
|
||||
return;
|
||||
}
|
||||
channel_a->set_last_packet_sent (std::chrono::steady_clock::now ());
|
||||
auto response_server = std::make_shared<nano::bootstrap_server> (socket_l, node_l);
|
||||
node_l->network.tcp_channels.insert (channel_a, socket_l, response_server);
|
||||
// Listen for possible responses
|
||||
response_server->socket->type_set (nano::socket::type_t::realtime_response_server);
|
||||
response_server->remote_node_id = channel_a->get_node_id ();
|
||||
response_server->start ();
|
||||
|
||||
if (!node_l->flags.disable_initial_telemetry_requests)
|
||||
{
|
||||
node_l->telemetry->get_metrics_single_peer_async (channel_a, [] (nano::telemetry_data_response const &) {
|
||||
// Intentionally empty, starts the telemetry request cycle to more quickly disconnect from invalid peers
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue