[ASAN] memory leak between tcp channels and sockets (#2378)

* [ASAN] memory leak between tcp channels and sockets

* Serg review comment

* Update memory stats string
This commit is contained in:
Wesley Shillingford 2019-11-05 08:36:08 +00:00 committed by GitHub
commit b870a38d20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 354 additions and 250 deletions

View file

@ -146,6 +146,12 @@ TEST (node, send_single_many_peers)
{
ASSERT_NO_ERROR (system.poll ());
}
system.stop ();
for (auto node : system.nodes)
{
ASSERT_TRUE (node->stopped);
ASSERT_TRUE (node->network.tcp_channels.node_id_handhake_sockets_empty ());
}
}
TEST (node, send_out_of_order)

View file

@ -28,10 +28,11 @@ constexpr double nano::bootstrap_limits::lazy_batch_pull_count_resize_ratio;
constexpr std::chrono::hours nano::bootstrap_excluded_peers::exclude_time_hours;
constexpr std::chrono::hours nano::bootstrap_excluded_peers::exclude_remove_hours;
nano::bootstrap_client::bootstrap_client (std::shared_ptr<nano::node> node_a, std::shared_ptr<nano::bootstrap_attempt> attempt_a, std::shared_ptr<nano::transport::channel_tcp> channel_a) :
nano::bootstrap_client::bootstrap_client (std::shared_ptr<nano::node> node_a, std::shared_ptr<nano::bootstrap_attempt> attempt_a, std::shared_ptr<nano::transport::channel_tcp> channel_a, std::shared_ptr<nano::socket> socket_a) :
node (node_a),
attempt (attempt_a),
channel (channel_a),
socket (socket_a),
receive_buffer (std::make_shared<std::vector<uint8_t>> ()),
start_time (std::chrono::steady_clock::now ()),
block_count (0),
@ -356,26 +357,29 @@ void nano::bootstrap_attempt::populate_connections ()
{
if (auto client = c.lock ())
{
new_clients.push_back (client);
endpoints.insert (client->channel->socket->remote_endpoint ());
double elapsed_sec = client->elapsed_seconds ();
auto blocks_per_sec = client->block_rate ();
rate_sum += blocks_per_sec;
if (client->elapsed_seconds () > nano::bootstrap_limits::bootstrap_connection_warmup_time_sec && client->block_count > 0)
if (auto socket_l = client->channel->socket.lock ())
{
sorted_connections.push (client);
}
// Force-stop the slowest peers, since they can take the whole bootstrap hostage by dribbling out blocks on the last remaining pull.
// This is ~1.5kilobits/sec.
if (elapsed_sec > nano::bootstrap_limits::bootstrap_minimum_termination_time_sec && blocks_per_sec < nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec)
{
if (node->config.logging.bulk_pull_logging ())
new_clients.push_back (client);
endpoints.insert (socket_l->remote_endpoint ());
double elapsed_sec = client->elapsed_seconds ();
auto blocks_per_sec = client->block_rate ();
rate_sum += blocks_per_sec;
if (client->elapsed_seconds () > nano::bootstrap_limits::bootstrap_connection_warmup_time_sec && client->block_count > 0)
{
node->logger.try_log (boost::str (boost::format ("Stopping slow peer %1% (elapsed sec %2%s > %3%s and %4% blocks per second < %5%)") % client->channel->to_string () % elapsed_sec % nano::bootstrap_limits::bootstrap_minimum_termination_time_sec % blocks_per_sec % nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec));
sorted_connections.push (client);
}
// Force-stop the slowest peers, since they can take the whole bootstrap hostage by dribbling out blocks on the last remaining pull.
// This is ~1.5kilobits/sec.
if (elapsed_sec > nano::bootstrap_limits::bootstrap_minimum_termination_time_sec && blocks_per_sec < nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec)
{
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log (boost::str (boost::format ("Stopping slow peer %1% (elapsed sec %2%s > %3%s and %4% blocks per second < %5%)") % client->channel->to_string () % elapsed_sec % nano::bootstrap_limits::bootstrap_minimum_termination_time_sec % blocks_per_sec % nano::bootstrap_limits::bootstrap_minimum_blocks_per_sec));
}
client->stop (true);
new_clients.pop_back ();
client->stop (true);
new_clients.pop_back ();
}
}
}
}
@ -469,7 +473,7 @@ void nano::bootstrap_attempt::connect_client (nano::tcp_endpoint const & endpoin
{
this_l->node->logger.try_log (boost::str (boost::format ("Connection established to %1%") % endpoint_a));
}
auto client (std::make_shared<nano::bootstrap_client> (this_l->node, this_l, std::make_shared<nano::transport::channel_tcp> (*this_l->node, socket)));
auto client (std::make_shared<nano::bootstrap_client> (this_l->node, this_l, std::make_shared<nano::transport::channel_tcp> (*this_l->node, socket), socket));
this_l->pool_connection (client);
}
else
@ -500,9 +504,12 @@ void nano::bootstrap_attempt::pool_connection (std::shared_ptr<nano::bootstrap_c
if (!stopped && !client_a->pending_stop && !node->bootstrap_initiator.excluded_peers.check (client_a->channel->get_tcp_endpoint ()))
{
// Idle bootstrap client socket
client_a->channel->socket->start_timer (node->network_params.node.idle_timeout);
// Push into idle deque
idle.push_back (client_a);
if (auto socket_l = client_a->channel->socket.lock ())
{
socket_l->start_timer (node->network_params.node.idle_timeout);
// Push into idle deque
idle.push_back (client_a);
}
}
condition.notify_all ();
}
@ -516,7 +523,7 @@ void nano::bootstrap_attempt::stop ()
{
if (auto client = i.lock ())
{
client->channel->socket->close ();
client->socket->close ();
}
}
if (auto i = frontiers.lock ())
@ -622,7 +629,10 @@ void nano::bootstrap_attempt::attempt_restart_check (nano::unique_lock<std::mute
{
if (auto client = i.lock ())
{
client->channel->socket->close ();
if (auto socket_l = client->channel->socket.lock ())
{
socket_l->close ();
}
}
}
idle.clear ();

View file

@ -158,7 +158,7 @@ public:
class bootstrap_client final : public std::enable_shared_from_this<bootstrap_client>
{
public:
bootstrap_client (std::shared_ptr<nano::node>, std::shared_ptr<nano::bootstrap_attempt>, std::shared_ptr<nano::transport::channel_tcp>);
bootstrap_client (std::shared_ptr<nano::node>, std::shared_ptr<nano::bootstrap_attempt>, std::shared_ptr<nano::transport::channel_tcp>, std::shared_ptr<nano::socket>);
~bootstrap_client ();
std::shared_ptr<nano::bootstrap_client> shared ();
void stop (bool force);
@ -167,6 +167,7 @@ public:
std::shared_ptr<nano::node> node;
std::shared_ptr<nano::bootstrap_attempt> attempt;
std::shared_ptr<nano::transport::channel_tcp> channel;
std::shared_ptr<nano::socket> socket;
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
std::chrono::steady_clock::time_point start_time;
std::atomic<uint64_t> block_count;

View file

@ -121,80 +121,87 @@ void nano::bulk_pull_client::throttled_receive_block ()
void nano::bulk_pull_client::receive_block ()
{
auto this_l (shared_from_this ());
connection->channel->socket->async_read (connection->receive_buffer, 1, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->received_type ();
}
else
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
if (auto socket_l = connection->channel->socket.lock ())
{
socket_l->async_read (connection->receive_buffer, 1, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error receiving block type: %1%") % ec.message ()));
this_l->received_type ();
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_receive_block_failure, nano::stat::dir::in);
this_l->network_error = true;
}
});
else
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error receiving block type: %1%") % ec.message ()));
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_receive_block_failure, nano::stat::dir::in);
this_l->network_error = true;
}
});
}
}
void nano::bulk_pull_client::received_type ()
{
auto this_l (shared_from_this ());
nano::block_type type (static_cast<nano::block_type> (connection->receive_buffer->data ()[0]));
switch (type)
if (auto socket_l = connection->channel->socket.lock ())
{
case nano::block_type::send:
switch (type)
{
connection->channel->socket->async_read (connection->receive_buffer, nano::send_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::receive:
{
connection->channel->socket->async_read (connection->receive_buffer, nano::receive_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::open:
{
connection->channel->socket->async_read (connection->receive_buffer, nano::open_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::change:
{
connection->channel->socket->async_read (connection->receive_buffer, nano::change_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::state:
{
connection->channel->socket->async_read (connection->receive_buffer, nano::state_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::not_a_block:
{
// Avoid re-using slow peers, or peers that sent the wrong blocks.
if (!connection->pending_stop && (expected == pull.end || (pull.count != 0 && pull.count == pull_blocks)))
case nano::block_type::send:
{
connection->attempt->pool_connection (connection);
socket_l->async_read (connection->receive_buffer, nano::send_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
break;
}
default:
{
if (connection->node->config.logging.network_packet_logging ())
case nano::block_type::receive:
{
connection->node->logger.try_log (boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast<int> (type)));
socket_l->async_read (connection->receive_buffer, nano::receive_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::open:
{
socket_l->async_read (connection->receive_buffer, nano::open_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::change:
{
socket_l->async_read (connection->receive_buffer, nano::change_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::state:
{
socket_l->async_read (connection->receive_buffer, nano::state_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::not_a_block:
{
// Avoid re-using slow peers, or peers that sent the wrong blocks.
if (!connection->pending_stop && (expected == pull.end || (pull.count != 0 && pull.count == pull_blocks)))
{
connection->attempt->pool_connection (connection);
}
break;
}
default:
{
if (connection->node->config.logging.network_packet_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast<int> (type)));
}
break;
}
break;
}
}
}
@ -329,68 +336,71 @@ void nano::bulk_pull_account_client::receive_pending ()
{
auto this_l (shared_from_this ());
size_t size_l (sizeof (nano::uint256_union) + sizeof (nano::uint128_union));
connection->channel->socket->async_read (connection->receive_buffer, size_l, [this_l, size_l](boost::system::error_code const & ec, size_t size_a) {
// An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect,
// we simply get a size of 0.
if (size_a == size_l)
{
if (!ec)
if (auto socket_l = connection->channel->socket.lock ())
{
socket_l->async_read (connection->receive_buffer, size_l, [this_l, size_l](boost::system::error_code const & ec, size_t size_a) {
// An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect,
// we simply get a size of 0.
if (size_a == size_l)
{
nano::block_hash pending;
nano::bufferstream frontier_stream (this_l->connection->receive_buffer->data (), sizeof (nano::uint256_union));
auto error1 (nano::try_read (frontier_stream, pending));
(void)error1;
assert (!error1);
nano::amount balance;
nano::bufferstream balance_stream (this_l->connection->receive_buffer->data () + sizeof (nano::uint256_union), sizeof (nano::uint128_union));
auto error2 (nano::try_read (balance_stream, balance));
(void)error2;
assert (!error2);
if (this_l->pull_blocks == 0 || !pending.is_zero ())
if (!ec)
{
if (this_l->pull_blocks == 0 || balance.number () >= this_l->connection->node->config.receive_minimum.number ())
nano::block_hash pending;
nano::bufferstream frontier_stream (this_l->connection->receive_buffer->data (), sizeof (nano::uint256_union));
auto error1 (nano::try_read (frontier_stream, pending));
(void)error1;
assert (!error1);
nano::amount balance;
nano::bufferstream balance_stream (this_l->connection->receive_buffer->data () + sizeof (nano::uint256_union), sizeof (nano::uint128_union));
auto error2 (nano::try_read (balance_stream, balance));
(void)error2;
assert (!error2);
if (this_l->pull_blocks == 0 || !pending.is_zero ())
{
this_l->pull_blocks++;
if (this_l->pull_blocks == 0 || balance.number () >= this_l->connection->node->config.receive_minimum.number ())
{
if (!pending.is_zero ())
this_l->pull_blocks++;
{
auto transaction (this_l->connection->node->store.tx_begin_read ());
if (!this_l->connection->node->store.block_exists (transaction, pending))
if (!pending.is_zero ())
{
this_l->connection->attempt->lazy_start (pending);
auto transaction (this_l->connection->node->store.tx_begin_read ());
if (!this_l->connection->node->store.block_exists (transaction, pending))
{
this_l->connection->attempt->lazy_start (pending);
}
}
}
this_l->receive_pending ();
}
else
{
this_l->connection->attempt->requeue_pending (this_l->account);
}
this_l->receive_pending ();
}
else
{
this_l->connection->attempt->requeue_pending (this_l->account);
this_l->connection->attempt->pool_connection (this_l->connection);
}
}
else
{
this_l->connection->attempt->pool_connection (this_l->connection);
this_l->connection->attempt->requeue_pending (this_l->account);
if (this_l->connection->node->config.logging.network_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while receiving bulk pull account frontier %1%") % ec.message ()));
}
}
}
else
{
this_l->connection->attempt->requeue_pending (this_l->account);
if (this_l->connection->node->config.logging.network_logging ())
if (this_l->connection->node->config.logging.network_message_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while receiving bulk pull account frontier %1%") % ec.message ()));
this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % size_l % size_a));
}
}
}
else
{
this_l->connection->attempt->requeue_pending (this_l->account);
if (this_l->connection->node->config.logging.network_message_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % size_l % size_a));
}
}
});
});
}
}
/**

View file

@ -51,21 +51,24 @@ nano::frontier_req_client::~frontier_req_client ()
void nano::frontier_req_client::receive_frontier ()
{
auto this_l (shared_from_this ());
connection->channel->socket->async_read (connection->receive_buffer, nano::frontier_req_client::size_frontier, [this_l](boost::system::error_code const & ec, size_t size_a) {
// An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect,
// we simply get a size of 0.
if (size_a == nano::frontier_req_client::size_frontier)
{
this_l->received_frontier (ec, size_a);
}
else
{
if (this_l->connection->node->config.logging.network_message_logging ())
if (auto socket_l = connection->channel->socket.lock ())
{
socket_l->async_read (connection->receive_buffer, nano::frontier_req_client::size_frontier, [this_l](boost::system::error_code const & ec, size_t size_a) {
// An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect,
// we simply get a size of 0.
if (size_a == nano::frontier_req_client::size_frontier)
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % nano::frontier_req_client::size_frontier % size_a));
this_l->received_frontier (ec, size_a);
}
}
});
else
{
if (this_l->connection->node->config.logging.network_message_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % nano::frontier_req_client::size_frontier % size_a));
}
}
});
}
}
void nano::frontier_req_client::unsynced (nano::block_hash const & head, nano::block_hash const & end)

View file

@ -577,20 +577,23 @@ public:
nano::node_id_handshake response_message (cookie, response);
auto shared_const_buffer = response_message.to_shared_const_buffer ();
// clang-format off
connection->socket->async_write (shared_const_buffer, [connection = connection ](boost::system::error_code const & ec, size_t size_a) {
if (ec)
connection->socket->async_write (shared_const_buffer, [connection = std::weak_ptr<nano::bootstrap_server> (connection) ](boost::system::error_code const & ec, size_t size_a) {
if (auto connection_l = connection.lock ())
{
if (connection->node->config.logging.network_node_id_handshake_logging ())
if (ec)
{
connection->node->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % connection->remote_endpoint % ec.message ()));
if (connection_l->node->config.logging.network_node_id_handshake_logging ())
{
connection_l->node->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % connection_l->remote_endpoint % ec.message ()));
}
// Stop invalid handshake
connection_l->stop ();
}
else
{
connection_l->node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out);
connection_l->finish_request ();
}
// Stop invalid handshake
connection->stop ();
}
else
{
connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out);
connection->finish_request ();
}
});
// clang-format on

View file

@ -223,9 +223,8 @@ void nano::socket::close ()
// This must be called from a strand or the destructor
void nano::socket::close_internal ()
{
if (!closed)
if (!closed.exchange (true))
{
closed = true;
io_timeout = boost::none;
boost::system::error_code ec;

View file

@ -2,7 +2,7 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::shared_ptr<nano::socket> socket_a) :
nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::weak_ptr<nano::socket> socket_a) :
channel (node_a),
socket (socket_a)
{
@ -12,22 +12,29 @@ nano::transport::channel_tcp::~channel_tcp ()
{
nano::lock_guard<std::mutex> lk (channel_mutex);
// Close socket. Exception: socket is used by bootstrap_server
if (socket && !server)
if (auto socket_l = socket.lock ())
{
socket->close ();
}
// Remove response server
if (response_server != nullptr)
{
response_server->stop ();
response_server = nullptr;
if (!server)
{
socket_l->close ();
}
// Remove response server
if (auto response_server_l = response_server.lock ())
{
response_server_l->stop ();
}
}
}
size_t nano::transport::channel_tcp::hash_code () const
{
std::hash<::nano::tcp_endpoint> hash;
return hash (socket->remote_endpoint ());
if (auto socket_l = socket.lock ())
{
return hash (socket_l->remote_endpoint ());
}
return 0;
}
bool nano::transport::channel_tcp::operator== (nano::transport::channel const & other_a) const
@ -43,7 +50,10 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const &
void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a)
{
socket->async_write (buffer_a, tcp_callback (detail_a, socket->remote_endpoint (), callback_a));
if (auto socket_l = socket.lock ())
{
socket_l->async_write (buffer_a, tcp_callback (detail_a, socket_l->remote_endpoint (), callback_a));
}
}
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_tcp::callback (nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
@ -77,7 +87,11 @@ std::function<void(boost::system::error_code const &, size_t)> nano::transport::
std::string nano::transport::channel_tcp::to_string () const
{
return boost::str (boost::format ("%1%") % socket->remote_endpoint ());
if (auto socket_l = socket.lock ())
{
return boost::str (boost::format ("%1%") % socket_l->remote_endpoint ());
}
return "";
}
nano::transport::tcp_channels::tcp_channels (nano::node & node_a) :
@ -85,7 +99,7 @@ node (node_a)
{
}
bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::channel_tcp> channel_a)
bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::channel_tcp> channel_a, std::shared_ptr<nano::socket> socket_a, std::shared_ptr<nano::bootstrap_server> bootstrap_server_a)
{
auto endpoint (channel_a->get_tcp_endpoint ());
assert (endpoint.address ().is_v6 ());
@ -102,7 +116,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::cha
{
channels.get<node_id_tag> ().erase (node_id);
}
channels.get<endpoint_tag> ().insert ({ channel_a });
channels.get<endpoint_tag> ().insert ({ channel_a, socket_a, bootstrap_server_a });
error = false;
lock.unlock ();
node.network.channel_observer (channel_a);
@ -271,7 +285,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
// Don't insert temporary channels for response_server
if (type_a == nano::bootstrap_server_type::realtime)
{
insert (temporary_channel);
insert (temporary_channel, socket_a, nullptr);
}
node.network.process_message (message_a, temporary_channel);
}
@ -314,18 +328,18 @@ void nano::transport::tcp_channels::stop ()
// Close all TCP sockets
for (auto i (channels.begin ()), j (channels.end ()); i != j; ++i)
{
if (i->channel->socket != nullptr)
if (i->socket)
{
i->channel->socket->close ();
i->socket->close ();
}
// Remove response server
if (i->channel->response_server != nullptr)
if (i->response_server)
{
i->channel->response_server->stop ();
i->channel->response_server = nullptr;
i->response_server->stop ();
}
}
channels.clear ();
node_id_handshake_sockets.clear ();
}
bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a)
@ -356,15 +370,18 @@ std::unique_ptr<nano::seq_con_info_component> nano::transport::tcp_channels::col
{
size_t channels_count = 0;
size_t attemps_count = 0;
size_t node_id_handshake_sockets_count = 0;
{
nano::lock_guard<std::mutex> guard (mutex);
channels_count = channels.size ();
attemps_count = attempts.size ();
node_id_handshake_sockets_count = node_id_handshake_sockets.size ();
}
auto composite = std::make_unique<seq_con_info_composite> (name);
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "channels", channels_count, sizeof (decltype (channels)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "attempts", attemps_count, sizeof (decltype (attempts)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "node_id_handshake_sockets", node_id_handshake_sockets_count, sizeof (decltype (node_id_handshake_sockets)::value_type) }));
return composite;
}
@ -377,6 +394,12 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point
// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<1> ().lower_bound (cutoff_a));
attempts.get<1> ().erase (attempts.get<1> ().begin (), attempts_cutoff);
// Cleanup any sockets which may still be existing from failed node id handshakes
node_id_handshake_sockets.erase (std::remove_if (node_id_handshake_sockets.begin (), node_id_handshake_sockets.end (), [this](auto socket) {
return channels.get<endpoint_tag> ().find (socket->remote_endpoint ()) == channels.get<endpoint_tag> ().end ();
}),
node_id_handshake_sockets.end ());
}
void nano::transport::tcp_channels::ongoing_keepalive ()
@ -455,6 +478,22 @@ void nano::transport::tcp_channels::update (nano::tcp_endpoint const & endpoint_
}
}
bool nano::transport::tcp_channels::node_id_handhake_sockets_empty () const
{
nano::lock_guard<std::mutex> guard (mutex);
return node_id_handshake_sockets.empty ();
}
void nano::transport::tcp_channels::remove_node_id_handshake_socket (std::shared_ptr<nano::socket> socket_a)
{
std::weak_ptr<nano::node> node_w (node.shared ());
if (auto node_l = node_w.lock ())
{
nano::lock_guard<std::mutex> guard (mutex);
node_id_handshake_sockets.erase (std::remove (node_id_handshake_sockets.begin (), node_id_handshake_sockets.end (), socket_a), node_id_handshake_sockets.end ());
}
}
void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a, std::function<void(std::shared_ptr<nano::transport::channel>)> const & callback_a)
{
if (node.flags.disable_tcp_realtime)
@ -463,10 +502,11 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
return;
}
auto socket (std::make_shared<nano::socket> (node.shared_from_this (), boost::none, nano::socket::concurrency::multi_writer));
auto channel (std::make_shared<nano::transport::channel_tcp> (node, socket));
std::weak_ptr<nano::socket> socket_w (socket);
auto channel (std::make_shared<nano::transport::channel_tcp> (node, socket_w));
std::weak_ptr<nano::node> node_w (node.shared ());
channel->socket->async_connect (nano::transport::map_endpoint_to_tcp (endpoint_a),
[node_w, channel, endpoint_a, callback_a](boost::system::error_code const & ec) {
socket->async_connect (nano::transport::map_endpoint_to_tcp (endpoint_a),
[node_w, channel, socket, endpoint_a, callback_a](boost::system::error_code const & ec) {
if (auto node_l = node_w.lock ())
{
if (!ec && channel)
@ -481,6 +521,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
}
std::shared_ptr<std::vector<uint8_t>> receive_buffer (std::make_shared<std::vector<uint8_t>> ());
receive_buffer->resize (256);
node_l->network.tcp_channels.node_id_handshake_sockets.push_back (socket);
channel->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel, endpoint_a, receive_buffer, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node_w.lock ())
{
@ -490,6 +531,10 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
}
else
{
if (auto socket_l = channel->socket.lock ())
{
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
}
if (node_l->config.logging.network_node_id_handshake_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ()));
@ -510,94 +555,115 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> channel_a, nano::endpoint const & endpoint_a, std::shared_ptr<std::vector<uint8_t>> receive_buffer_a, std::function<void(std::shared_ptr<nano::transport::channel>)> const & callback_a)
{
std::weak_ptr<nano::node> node_w (node.shared ());
channel_a->socket->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node_w.lock ())
{
if (!ec && channel_a)
if (auto socket_l = channel_a->socket.lock ())
{
// clang-format off
auto cleanup_and_udp_fallback = [socket_w = channel_a->socket, node_w](nano::endpoint const & endpoint_a, std::function<void(std::shared_ptr<nano::transport::channel>)> const & callback_a) {
if (auto node_l = node_w.lock ())
{
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
auto error (false);
nano::bufferstream stream (receive_buffer_a->data (), size_a);
nano::message_header header (error, stream);
if (!error && header.type == nano::message_type::node_id_handshake && header.version_using >= node_l->network_params.protocol.protocol_version_min)
node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a);
if (auto socket_l = socket_w.lock ())
{
nano::node_id_handshake message (error, stream, header);
if (!error && message.response && message.query)
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
}
}
};
// clang-format on
socket_l->async_read (receive_buffer_a, 8 + sizeof (nano::account) + sizeof (nano::account) + sizeof (nano::signature), [node_w, channel_a, endpoint_a, receive_buffer_a, callback_a, cleanup_and_udp_fallback](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node_w.lock ())
{
if (!ec && channel_a)
{
node_l->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
auto error (false);
nano::bufferstream stream (receive_buffer_a->data (), size_a);
nano::message_header header (error, stream);
if (!error && header.type == nano::message_type::node_id_handshake && header.version_using >= node_l->network_params.protocol.protocol_version_min)
{
channel_a->set_network_version (header.version_using);
auto node_id (message.response->first);
bool process (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub);
if (process)
nano::node_id_handshake message (error, stream, header);
if (!error && message.response && message.query)
{
/* If node ID is known, don't establish new connection
Exception: temporary channels from bootstrap_server */
auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id));
if (existing_channel)
channel_a->set_network_version (header.version_using);
auto node_id (message.response->first);
bool process (!node_l->network.syn_cookies.validate (endpoint_a, node_id, message.response->second) && node_id != node_l->node_id.pub);
if (process)
{
process = existing_channel->server;
/* If node ID is known, don't establish new connection
Exception: temporary channels from bootstrap_server */
auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id));
if (existing_channel)
{
process = existing_channel->server;
}
}
if (process)
{
channel_a->set_node_id (node_id);
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
boost::optional<std::pair<nano::account, nano::signature>> response (std::make_pair (node_l->node_id.pub, nano::sign_message (node_l->node_id.prv, node_l->node_id.pub, *message.query)));
nano::node_id_handshake response_message (boost::none, response);
auto bytes = response_message.to_shared_const_buffer ();
if (node_l->config.logging.network_node_id_handshake_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ()));
}
channel_a->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel_a, endpoint_a, callback_a, cleanup_and_udp_fallback](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node_w.lock ())
{
if (!ec && channel_a)
{
// Insert new node ID connection
if (auto socket_l = channel_a->socket.lock ())
{
channel_a->set_last_packet_sent (std::chrono::steady_clock::now ());
auto response_server = std::make_shared<nano::bootstrap_server> (socket_l, node_l);
node_l->network.tcp_channels.insert (channel_a, socket_l, response_server);
if (callback_a)
{
callback_a (channel_a);
}
// Listen for possible responses
response_server->type = nano::bootstrap_server_type::realtime_response_server;
response_server->remote_node_id = channel_a->get_node_id ();
response_server->receive ();
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
}
}
else
{
if (node_l->config.logging.network_node_id_handshake_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ()));
}
cleanup_and_udp_fallback (endpoint_a, callback_a);
}
}
});
}
}
if (process)
else
{
channel_a->set_node_id (node_id);
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
boost::optional<std::pair<nano::account, nano::signature>> response (std::make_pair (node_l->node_id.pub, nano::sign_message (node_l->node_id.prv, node_l->node_id.pub, *message.query)));
nano::node_id_handshake response_message (boost::none, response);
auto bytes = response_message.to_shared_const_buffer ();
if (node_l->config.logging.network_node_id_handshake_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Node ID handshake response sent with node ID %1% to %2%: query %3%") % node_l->node_id.pub.to_node_id () % endpoint_a % (*message.query).to_string ()));
}
channel_a->send_buffer (bytes, nano::stat::detail::node_id_handshake, [node_w, channel_a, endpoint_a, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node_l = node_w.lock ())
{
if (!ec && channel_a)
{
// Insert new node ID connection
channel_a->set_last_packet_sent (std::chrono::steady_clock::now ());
node_l->network.tcp_channels.insert (channel_a);
if (callback_a)
{
callback_a (channel_a);
}
// Listen for possible responses
channel_a->response_server = std::make_shared<nano::bootstrap_server> (channel_a->socket, node_l);
channel_a->response_server->type = nano::bootstrap_server_type::realtime_response_server;
channel_a->response_server->remote_node_id = channel_a->get_node_id ();
channel_a->response_server->receive ();
}
else
{
if (node_l->config.logging.network_node_id_handshake_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % endpoint_a % ec.message ()));
}
node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a);
}
}
});
cleanup_and_udp_fallback (endpoint_a, callback_a);
}
}
else
{
node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a);
cleanup_and_udp_fallback (endpoint_a, callback_a);
}
}
else
{
node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a);
if (node_l->config.logging.network_node_id_handshake_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%: %2%") % endpoint_a % ec.message ()));
}
cleanup_and_udp_fallback (endpoint_a, callback_a);
}
}
else
{
if (node_l->config.logging.network_node_id_handshake_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Error reading node_id_handshake from %1%: %2%") % endpoint_a % ec.message ()));
}
node_l->network.tcp_channels.udp_fallback (endpoint_a, callback_a);
}
}
});
});
}
}
void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a, std::function<void(std::shared_ptr<nano::transport::channel>)> const & callback_a)

View file

@ -23,7 +23,7 @@ namespace transport
friend class nano::transport::tcp_channels;
public:
channel_tcp (nano::node &, std::shared_ptr<nano::socket>);
channel_tcp (nano::node &, std::weak_ptr<nano::socket>);
~channel_tcp ();
size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
@ -33,18 +33,18 @@ namespace transport
std::string to_string () const override;
bool operator== (nano::transport::channel_tcp const & other_a) const
{
return &node == &other_a.node && socket == other_a.socket;
return &node == &other_a.node && socket.lock () == other_a.socket.lock ();
}
std::shared_ptr<nano::socket> socket;
std::shared_ptr<nano::bootstrap_server> response_server;
std::weak_ptr<nano::socket> socket;
std::weak_ptr<nano::bootstrap_server> response_server;
bool server{ false };
nano::endpoint get_endpoint () const override
{
nano::lock_guard<std::mutex> lk (channel_mutex);
if (socket)
if (auto socket_l = socket.lock ())
{
return nano::transport::map_tcp_to_endpoint (socket->remote_endpoint ());
return nano::transport::map_tcp_to_endpoint (socket_l->remote_endpoint ());
}
else
{
@ -55,9 +55,9 @@ namespace transport
nano::tcp_endpoint get_tcp_endpoint () const override
{
nano::lock_guard<std::mutex> lk (channel_mutex);
if (socket)
if (auto socket_l = socket.lock ())
{
return socket->remote_endpoint ();
return socket_l->remote_endpoint ();
}
else
{
@ -76,7 +76,7 @@ namespace transport
public:
tcp_channels (nano::node &);
bool insert (std::shared_ptr<nano::transport::channel_tcp>);
bool insert (std::shared_ptr<nano::transport::channel_tcp>, std::shared_ptr<nano::socket>, std::shared_ptr<nano::bootstrap_server>);
void erase (nano::tcp_endpoint const &);
size_t size () const;
std::shared_ptr<nano::transport::channel_tcp> find_channel (nano::tcp_endpoint const &) const;
@ -104,6 +104,8 @@ namespace transport
void start_tcp (nano::endpoint const &, std::function<void(std::shared_ptr<nano::transport::channel>)> const & = nullptr);
void start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp>, nano::endpoint const &, std::shared_ptr<std::vector<uint8_t>>, std::function<void(std::shared_ptr<nano::transport::channel>)> const &);
void udp_fallback (nano::endpoint const &, std::function<void(std::shared_ptr<nano::transport::channel>)> const &);
void remove_node_id_handshake_socket (std::shared_ptr<nano::socket> socket_a);
bool node_id_handhake_sockets_empty () const;
nano::node & node;
private:
@ -129,6 +131,8 @@ namespace transport
{
public:
std::shared_ptr<nano::transport::channel_tcp> channel;
std::shared_ptr<nano::socket> socket;
std::shared_ptr<nano::bootstrap_server> response_server;
nano::tcp_endpoint endpoint () const
{
return channel->get_tcp_endpoint ();
@ -175,6 +179,8 @@ namespace transport
boost::multi_index::hashed_unique<boost::multi_index::member<tcp_endpoint_attempt, nano::tcp_endpoint, &tcp_endpoint_attempt::endpoint>>,
boost::multi_index::ordered_non_unique<boost::multi_index::member<tcp_endpoint_attempt, std::chrono::steady_clock::time_point, &tcp_endpoint_attempt::last_attempt>>>>
attempts;
// This owns the sockets until the node_id_handshake has been completed. Needed to prevent self referencing callbacks, they are periodically removed if any are dangling.
std::vector<std::shared_ptr<nano::socket>> node_id_handshake_sockets;
std::atomic<bool> stopped{ false };
};
} // namespace transport