Converting nano::socket::note from weak_ptr<node> to node to a plain reference. This runs cleanly in asan and tsan so it doesn't appear that a socket ever exceeds the lifetime of its owning node. (#2936)

This commit is contained in:
clemahieu 2020-09-18 21:07:40 +02:00 committed by GitHub
commit 7b5bfcceac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 114 additions and 144 deletions

View file

@ -733,7 +733,7 @@ TEST (message_buffer_manager, stats)
TEST (tcp_listener, tcp_node_id_handshake)
{
nano::system system (1);
auto socket (std::make_shared<nano::socket> (system.nodes[0]));
auto socket (std::make_shared<nano::socket> (*system.nodes[0]));
auto bootstrap_endpoint (system.nodes[0]->bootstrap.endpoint ());
auto cookie (system.nodes[0]->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (bootstrap_endpoint)));
nano::node_id_handshake node_id_handshake (cookie, boost::none);
@ -766,7 +766,7 @@ TEST (tcp_listener, tcp_listener_timeout_empty)
{
nano::system system (1);
auto node0 (system.nodes[0]);
auto socket (std::make_shared<nano::socket> (node0));
auto socket (std::make_shared<nano::socket> (*node0));
std::atomic<bool> connected (false);
socket->async_connect (node0->bootstrap.endpoint (), [&connected](boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
@ -789,7 +789,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake)
{
nano::system system (1);
auto node0 (system.nodes[0]);
auto socket (std::make_shared<nano::socket> (node0));
auto socket (std::make_shared<nano::socket> (*node0));
auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->bootstrap.endpoint ())));
nano::node_id_handshake node_id_handshake (cookie, boost::none);
auto input (node_id_handshake.to_shared_const_buffer (false));

View file

@ -22,7 +22,7 @@ TEST (socket, drop_policy)
// The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop)
size_t max_write_queue_size = 0;
{
auto client_dummy (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
auto client_dummy (std::make_shared<nano::socket> (*node, boost::none, nano::socket::concurrency::multi_writer));
max_write_queue_size = client_dummy->get_max_write_queue_size ();
}
@ -30,7 +30,7 @@ TEST (socket, drop_policy)
auto server_port (nano::get_available_port ());
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), server_port);
auto server_socket (std::make_shared<nano::server_socket> (node, endpoint, 1, nano::socket::concurrency::multi_writer));
auto server_socket (std::make_shared<nano::server_socket> (*node, endpoint, 1, nano::socket::concurrency::multi_writer));
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
@ -41,7 +41,7 @@ TEST (socket, drop_policy)
return true;
});
auto client (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
auto client (std::make_shared<nano::socket> (*node, boost::none, nano::socket::concurrency::multi_writer));
nano::util::counted_completion write_completion (total_message_count);
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_port),
@ -123,7 +123,7 @@ TEST (socket, concurrent_writes)
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), 25000);
auto server_socket (std::make_shared<nano::server_socket> (node, endpoint, max_connections, nano::socket::concurrency::multi_writer));
auto server_socket (std::make_shared<nano::server_socket> (*node, endpoint, max_connections, nano::socket::concurrency::multi_writer));
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
@ -148,7 +148,7 @@ TEST (socket, concurrent_writes)
std::vector<std::shared_ptr<nano::socket>> clients;
for (unsigned i = 0; i < client_count; i++)
{
auto client (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
auto client (std::make_shared<nano::socket> (*node, boost::none, nano::socket::concurrency::multi_writer));
clients.push_back (client);
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000),
[&connection_count_completion](boost::system::error_code const & ec_a) {

View file

@ -143,7 +143,7 @@ std::shared_ptr<nano::bootstrap_client> nano::bootstrap_connections::find_connec
void nano::bootstrap_connections::connect_client (nano::tcp_endpoint const & endpoint_a, bool push_front)
{
++connections_count;
auto socket (std::make_shared<nano::socket> (node.shared ()));
auto socket (std::make_shared<nano::socket> (node));
auto this_l (shared_from_this ());
socket->async_connect (endpoint_a,
[this_l, socket, endpoint_a, push_front](boost::system::error_code const & ec) {

View file

@ -17,7 +17,7 @@ void nano::bootstrap_listener::start ()
{
nano::lock_guard<std::mutex> lock (mutex);
on = true;
listening_socket = std::make_shared<nano::server_socket> (node.shared (), boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.tcp_incoming_connections_max);
listening_socket = std::make_shared<nano::server_socket> (node, boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.tcp_incoming_connections_max);
boost::system::error_code ec;
listening_socket->start (ec);
if (ec)

View file

@ -8,18 +8,18 @@
#include <limits>
nano::socket::socket (std::shared_ptr<nano::node> node_a, boost::optional<std::chrono::seconds> io_timeout_a, nano::socket::concurrency concurrency_a) :
strand (node_a->io_ctx.get_executor ()),
tcp_socket (node_a->io_ctx),
node (node_a),
writer_concurrency (concurrency_a),
next_deadline (std::numeric_limits<uint64_t>::max ()),
last_completion_time (0),
io_timeout (io_timeout_a)
nano::socket::socket (nano::node & node_a, boost::optional<std::chrono::seconds> io_timeout_a, nano::socket::concurrency concurrency_a) :
strand{ node_a.io_ctx.get_executor () },
tcp_socket{ node_a.io_ctx },
node{ node_a },
writer_concurrency{ concurrency_a },
next_deadline{ std::numeric_limits<uint64_t>::max () },
last_completion_time{ 0 },
io_timeout{ io_timeout_a }
{
if (!io_timeout)
{
io_timeout = node_a->config.tcp_io_timeout;
io_timeout = node_a.config.tcp_io_timeout;
}
}
@ -54,12 +54,9 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, s
boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a),
boost::asio::bind_executor (this_l->strand,
[this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node = this_l->node.lock ())
{
node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
this_l->stop_timer ();
callback_a (ec, size_a);
}
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
this_l->stop_timer ();
callback_a (ec, size_a);
}));
}));
}
@ -88,15 +85,15 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:
{
this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a });
}
else if (auto node_l = this_l->node.lock ())
else
{
if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
}
else
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
}
if (callback_a)
@ -124,26 +121,20 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:
nano::async_write (tcp_socket, buffer_a,
boost::asio::bind_executor (strand,
[this_l, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node = this_l->node.lock ())
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
this_l->stop_timer ();
if (callback_a)
{
node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
this_l->stop_timer ();
if (callback_a)
{
callback_a (ec, size_a);
}
callback_a (ec, size_a);
}
}));
}
}
else if (callback_a)
{
if (auto node = this_l->node.lock ())
{
node->background ([callback_a]() {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
node.background ([callback_a]() {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
}
@ -159,34 +150,31 @@ void nano::socket::write_queued_messages ()
[msg, this_w](boost::system::error_code ec, std::size_t size_a) {
if (auto this_l = this_w.lock ())
{
if (auto node = this_l->node.lock ())
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
this_l->stop_timer ();
if (!this_l->closed)
{
node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
this_l->stop_timer ();
if (!this_l->closed)
{
if (msg.callback)
{
msg.callback (ec, size_a);
}
this_l->send_queue.pop_front ();
if (!ec && !this_l->send_queue.empty ())
{
this_l->write_queued_messages ();
}
else if (this_l->send_queue.empty ())
{
// Idle TCP realtime client socket after writes
this_l->start_timer (node->network_params.node.idle_timeout);
}
}
else if (msg.callback)
if (msg.callback)
{
msg.callback (ec, size_a);
}
this_l->send_queue.pop_front ();
if (!ec && !this_l->send_queue.empty ())
{
this_l->write_queued_messages ();
}
else if (this_l->send_queue.empty ())
{
// Idle TCP realtime client socket after writes
this_l->start_timer (this_l->node.network_params.node.idle_timeout);
}
}
else if (msg.callback)
{
msg.callback (ec, size_a);
}
}
}));
@ -195,10 +183,7 @@ void nano::socket::write_queued_messages ()
void nano::socket::start_timer ()
{
if (auto node_l = node.lock ())
{
start_timer (io_timeout.get ());
}
start_timer (io_timeout.get ());
}
void nano::socket::start_timer (std::chrono::seconds deadline_a)
@ -214,37 +199,31 @@ void nano::socket::stop_timer ()
void nano::socket::checkup ()
{
std::weak_ptr<nano::socket> this_w (shared_from_this ());
if (auto node_l = node.lock ())
{
node_l->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 2), [this_w, node_l]() {
if (auto this_l = this_w.lock ())
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 2), [this_w]() {
if (auto this_l = this_w.lock ())
{
uint64_t now (nano::seconds_since_epoch ());
if (this_l->next_deadline != std::numeric_limits<uint64_t>::max () && now - this_l->last_completion_time > this_l->next_deadline)
{
uint64_t now (nano::seconds_since_epoch ());
if (this_l->next_deadline != std::numeric_limits<uint64_t>::max () && now - this_l->last_completion_time > this_l->next_deadline)
if (this_l->node.config.logging.network_timeout_logging ())
{
if (auto node_l = this_l->node.lock ())
// The remote end may have closed the connection before this side timing out, in which case the remote address is no longer available.
boost::system::error_code ec_remote_l;
boost::asio::ip::tcp::endpoint remote_endpoint_l = this_l->tcp_socket.remote_endpoint (ec_remote_l);
if (!ec_remote_l)
{
if (node_l->config.logging.network_timeout_logging ())
{
// The remote end may have closed the connection before this side timing out, in which case the remote address is no longer available.
boost::system::error_code ec_remote_l;
boost::asio::ip::tcp::endpoint remote_endpoint_l = this_l->tcp_socket.remote_endpoint (ec_remote_l);
if (!ec_remote_l)
{
node_l->logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % remote_endpoint_l));
}
}
this_l->timed_out = true;
this_l->close ();
this_l->node.logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % remote_endpoint_l));
}
}
else if (!this_l->closed)
{
this_l->checkup ();
}
this_l->timed_out = true;
this_l->close ();
}
});
}
else if (!this_l->closed)
{
this_l->checkup ();
}
}
});
}
bool nano::socket::has_timed_out () const
@ -275,12 +254,9 @@ void nano::socket::flush_send_queue_callbacks ()
auto & item = send_queue.front ();
if (item.callback)
{
if (auto node_l = node.lock ())
{
node_l->background ([callback = std::move (item.callback)]() {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
node.background ([callback = std::move (item.callback)]() {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
send_queue.pop_front ();
}
@ -300,11 +276,8 @@ void nano::socket::close_internal ()
flush_send_queue_callbacks ();
if (ec)
{
if (auto node_l = node.lock ())
{
node_l->logger.try_log ("Failed to close socket gracefully: ", ec.message ());
node_l->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close);
}
node.logger.try_log ("Failed to close socket gracefully: ", ec.message ());
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close);
}
}
}
@ -324,8 +297,8 @@ size_t nano::socket::get_max_write_queue_size () const
return queue_size_max;
}
nano::server_socket::server_socket (std::shared_ptr<nano::node> node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, nano::socket::concurrency concurrency_a) :
socket (node_a, std::chrono::seconds::max (), concurrency_a), acceptor (node_a->io_ctx), local (local_a), max_inbound_connections (max_connections_a), concurrency_new_connections (concurrency_a)
nano::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, nano::socket::concurrency concurrency_a) :
socket (node_a, std::chrono::seconds::max (), concurrency_a), acceptor (node_a.io_ctx), local (local_a), max_inbound_connections (max_connections_a), concurrency_new_connections (concurrency_a)
{
}
@ -363,52 +336,49 @@ void nano::server_socket::on_connection (std::function<bool(std::shared_ptr<nano
auto this_l (std::static_pointer_cast<nano::server_socket> (shared_from_this ()));
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, callback_a]() {
if (auto node_l = this_l->node.lock ())
if (this_l->acceptor.is_open ())
{
if (this_l->acceptor.is_open ())
if (this_l->connections.size () < this_l->max_inbound_connections)
{
// Prepare new connection
auto new_connection (std::make_shared<nano::socket> (node_l->shared (), boost::none, this_l->concurrency_new_connections));
auto new_connection (std::make_shared<nano::socket> (this_l->node, boost::none, this_l->concurrency_new_connections));
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, callback_a](boost::system::error_code const & ec_a) {
this_l->evict_dead_connections ();
if (auto node_l = this_l->node.lock ())
if (this_l->connections.size () < this_l->max_inbound_connections)
{
if (this_l->connections.size () < this_l->max_inbound_connections)
if (!ec_a)
{
if (!ec_a)
{
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
// an IO operation immediately, which will start a timer.
new_connection->checkup ();
new_connection->start_timer (node_l->network_params.network.is_dev_network () ? std::chrono::seconds (2) : node_l->network_params.node.idle_timeout);
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections.push_back (new_connection);
}
else
{
node_l->logger.try_log ("Unable to accept connection: ", ec_a.message ());
}
// If the callback returns true, keep accepting new connections
if (callback_a (new_connection, ec_a))
{
this_l->on_connection (callback_a);
}
else
{
node_l->logger.try_log ("Stopping to accept connections");
}
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
// an IO operation immediately, which will start a timer.
new_connection->checkup ();
new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.node.idle_timeout);
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections.push_back (new_connection);
}
else
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
boost::asio::post (this_l->strand, boost::asio::bind_executor (this_l->strand, [this_l, callback_a]() {
this_l->on_connection (callback_a);
}));
this_l->node.logger.try_log ("Unable to accept connection: ", ec_a.message ());
}
};
// If the callback returns true, keep accepting new connections
if (callback_a (new_connection, ec_a))
{
this_l->on_connection (callback_a);
}
else
{
this_l->node.logger.try_log ("Stopping to accept connections");
}
}
else
{
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
boost::asio::post (this_l->strand, boost::asio::bind_executor (this_l->strand, [this_l, callback_a]() {
this_l->on_connection (callback_a);
}));
}
}));
}
}

View file

@ -51,7 +51,7 @@ public:
* @param io_timeout If tcp async operation is not completed within the timeout, the socket is closed. If not set, the tcp_io_timeout config option is used.
* @param concurrency write concurrency
*/
explicit socket (std::shared_ptr<nano::node> node, boost::optional<std::chrono::seconds> io_timeout = boost::none, concurrency = concurrency::single_writer);
explicit socket (nano::node & node, boost::optional<std::chrono::seconds> io_timeout = boost::none, concurrency = concurrency::single_writer);
virtual ~socket ();
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void(boost::system::error_code const &)>);
void async_read (std::shared_ptr<std::vector<uint8_t>>, size_t, std::function<void(boost::system::error_code const &, size_t)>);
@ -80,7 +80,7 @@ protected:
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::socket tcp_socket;
std::weak_ptr<nano::node> node;
nano::node & node;
/** The other end of the connection */
boost::asio::ip::tcp::endpoint remote;
@ -116,7 +116,7 @@ public:
* @param max_connections_a Maximum number of concurrent connections
* @param concurrency_a Write concurrency for new connections
*/
explicit server_socket (std::shared_ptr<nano::node> node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, concurrency concurrency_a = concurrency::single_writer);
explicit server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, concurrency concurrency_a = concurrency::single_writer);
/**Start accepting new connections */
void start (boost::system::error_code &);
/** Stop accepting new connections */

View file

@ -531,7 +531,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
node.network.tcp_channels.udp_fallback (endpoint_a, callback_a);
return;
}
auto socket (std::make_shared<nano::socket> (node.shared_from_this (), boost::none, nano::socket::concurrency::multi_writer));
auto socket (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
std::weak_ptr<nano::socket> socket_w (socket);
auto channel (std::make_shared<nano::transport::channel_tcp> (node, socket_w));
std::weak_ptr<nano::node> node_w (node.shared ());