diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 22bd35694..e3170a993 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -733,7 +733,7 @@ TEST (message_buffer_manager, stats) TEST (tcp_listener, tcp_node_id_handshake) { nano::system system (1); - auto socket (std::make_shared (system.nodes[0])); + auto socket (std::make_shared (*system.nodes[0])); auto bootstrap_endpoint (system.nodes[0]->bootstrap.endpoint ()); auto cookie (system.nodes[0]->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (bootstrap_endpoint))); nano::node_id_handshake node_id_handshake (cookie, boost::none); @@ -766,7 +766,7 @@ TEST (tcp_listener, tcp_listener_timeout_empty) { nano::system system (1); auto node0 (system.nodes[0]); - auto socket (std::make_shared (node0)); + auto socket (std::make_shared (*node0)); std::atomic connected (false); socket->async_connect (node0->bootstrap.endpoint (), [&connected](boost::system::error_code const & ec) { ASSERT_FALSE (ec); @@ -789,7 +789,7 @@ TEST (tcp_listener, tcp_listener_timeout_node_id_handshake) { nano::system system (1); auto node0 (system.nodes[0]); - auto socket (std::make_shared (node0)); + auto socket (std::make_shared (*node0)); auto cookie (node0->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (node0->bootstrap.endpoint ()))); nano::node_id_handshake node_id_handshake (cookie, boost::none); auto input (node_id_handshake.to_shared_const_buffer (false)); diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 24709ae2b..545dd5a19 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -22,7 +22,7 @@ TEST (socket, drop_policy) // The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop) size_t max_write_queue_size = 0; { - auto client_dummy (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); + auto client_dummy (std::make_shared (*node, boost::none, nano::socket::concurrency::multi_writer)); max_write_queue_size = client_dummy->get_max_write_queue_size (); } @@ -30,7 +30,7 @@ TEST (socket, drop_policy) auto server_port (nano::get_available_port ()); boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), server_port); - auto server_socket (std::make_shared (node, endpoint, 1, nano::socket::concurrency::multi_writer)); + auto server_socket (std::make_shared (*node, endpoint, 1, nano::socket::concurrency::multi_writer)); boost::system::error_code ec; server_socket->start (ec); ASSERT_FALSE (ec); @@ -41,7 +41,7 @@ TEST (socket, drop_policy) return true; }); - auto client (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); + auto client (std::make_shared (*node, boost::none, nano::socket::concurrency::multi_writer)); nano::util::counted_completion write_completion (total_message_count); client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_port), @@ -123,7 +123,7 @@ TEST (socket, concurrent_writes) boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), 25000); - auto server_socket (std::make_shared (node, endpoint, max_connections, nano::socket::concurrency::multi_writer)); + auto server_socket (std::make_shared (*node, endpoint, max_connections, nano::socket::concurrency::multi_writer)); boost::system::error_code ec; server_socket->start (ec); ASSERT_FALSE (ec); @@ -148,7 +148,7 @@ TEST (socket, concurrent_writes) std::vector> clients; for (unsigned i = 0; i < client_count; i++) { - auto client (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); + auto client (std::make_shared (*node, boost::none, nano::socket::concurrency::multi_writer)); clients.push_back (client); client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000), [&connection_count_completion](boost::system::error_code const & ec_a) { diff --git a/nano/node/bootstrap/bootstrap_connections.cpp b/nano/node/bootstrap/bootstrap_connections.cpp index 45a5e3722..1195dd812 100644 --- a/nano/node/bootstrap/bootstrap_connections.cpp +++ b/nano/node/bootstrap/bootstrap_connections.cpp @@ -143,7 +143,7 @@ std::shared_ptr nano::bootstrap_connections::find_connec void nano::bootstrap_connections::connect_client (nano::tcp_endpoint const & endpoint_a, bool push_front) { ++connections_count; - auto socket (std::make_shared (node.shared ())); + auto socket (std::make_shared (node)); auto this_l (shared_from_this ()); socket->async_connect (endpoint_a, [this_l, socket, endpoint_a, push_front](boost::system::error_code const & ec) { diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 4badab5de..efa12bda3 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -17,7 +17,7 @@ void nano::bootstrap_listener::start () { nano::lock_guard lock (mutex); on = true; - listening_socket = std::make_shared (node.shared (), boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.tcp_incoming_connections_max); + listening_socket = std::make_shared (node, boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.tcp_incoming_connections_max); boost::system::error_code ec; listening_socket->start (ec); if (ec) diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index 14ff5c9b8..be56d85ad 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -8,18 +8,18 @@ #include -nano::socket::socket (std::shared_ptr node_a, boost::optional io_timeout_a, nano::socket::concurrency concurrency_a) : -strand (node_a->io_ctx.get_executor ()), -tcp_socket (node_a->io_ctx), -node (node_a), -writer_concurrency (concurrency_a), -next_deadline (std::numeric_limits::max ()), -last_completion_time (0), -io_timeout (io_timeout_a) +nano::socket::socket (nano::node & node_a, boost::optional io_timeout_a, nano::socket::concurrency concurrency_a) : +strand{ node_a.io_ctx.get_executor () }, +tcp_socket{ node_a.io_ctx }, +node{ node_a }, +writer_concurrency{ concurrency_a }, +next_deadline{ std::numeric_limits::max () }, +last_completion_time{ 0 }, +io_timeout{ io_timeout_a } { if (!io_timeout) { - io_timeout = node_a->config.tcp_io_timeout; + io_timeout = node_a.config.tcp_io_timeout; } } @@ -54,12 +54,9 @@ void nano::socket::async_read (std::shared_ptr> buffer_a, s boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a), boost::asio::bind_executor (this_l->strand, [this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) { - if (auto node = this_l->node.lock ()) - { - node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a); - this_l->stop_timer (); - callback_a (ec, size_a); - } + this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a); + this_l->stop_timer (); + callback_a (ec, size_a); })); })); } @@ -88,15 +85,15 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std: { this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a }); } - else if (auto node_l = this_l->node.lock ()) + else { if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop) { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); } else { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); } if (callback_a) @@ -124,26 +121,20 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std: nano::async_write (tcp_socket, buffer_a, boost::asio::bind_executor (strand, [this_l, callback_a](boost::system::error_code const & ec, size_t size_a) { - if (auto node = this_l->node.lock ()) + this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); + this_l->stop_timer (); + if (callback_a) { - node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); - this_l->stop_timer (); - if (callback_a) - { - callback_a (ec, size_a); - } + callback_a (ec, size_a); } })); } } else if (callback_a) { - if (auto node = this_l->node.lock ()) - { - node->background ([callback_a]() { - callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - }); - } + node.background ([callback_a]() { + callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); + }); } } @@ -159,34 +150,31 @@ void nano::socket::write_queued_messages () [msg, this_w](boost::system::error_code ec, std::size_t size_a) { if (auto this_l = this_w.lock ()) { - if (auto node = this_l->node.lock ()) + this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); + + this_l->stop_timer (); + + if (!this_l->closed) { - node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); - - this_l->stop_timer (); - - if (!this_l->closed) - { - if (msg.callback) - { - msg.callback (ec, size_a); - } - - this_l->send_queue.pop_front (); - if (!ec && !this_l->send_queue.empty ()) - { - this_l->write_queued_messages (); - } - else if (this_l->send_queue.empty ()) - { - // Idle TCP realtime client socket after writes - this_l->start_timer (node->network_params.node.idle_timeout); - } - } - else if (msg.callback) + if (msg.callback) { msg.callback (ec, size_a); } + + this_l->send_queue.pop_front (); + if (!ec && !this_l->send_queue.empty ()) + { + this_l->write_queued_messages (); + } + else if (this_l->send_queue.empty ()) + { + // Idle TCP realtime client socket after writes + this_l->start_timer (this_l->node.network_params.node.idle_timeout); + } + } + else if (msg.callback) + { + msg.callback (ec, size_a); } } })); @@ -195,10 +183,7 @@ void nano::socket::write_queued_messages () void nano::socket::start_timer () { - if (auto node_l = node.lock ()) - { - start_timer (io_timeout.get ()); - } + start_timer (io_timeout.get ()); } void nano::socket::start_timer (std::chrono::seconds deadline_a) @@ -214,37 +199,31 @@ void nano::socket::stop_timer () void nano::socket::checkup () { std::weak_ptr this_w (shared_from_this ()); - if (auto node_l = node.lock ()) - { - node_l->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 2), [this_w, node_l]() { - if (auto this_l = this_w.lock ()) + node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 2), [this_w]() { + if (auto this_l = this_w.lock ()) + { + uint64_t now (nano::seconds_since_epoch ()); + if (this_l->next_deadline != std::numeric_limits::max () && now - this_l->last_completion_time > this_l->next_deadline) { - uint64_t now (nano::seconds_since_epoch ()); - if (this_l->next_deadline != std::numeric_limits::max () && now - this_l->last_completion_time > this_l->next_deadline) + if (this_l->node.config.logging.network_timeout_logging ()) { - if (auto node_l = this_l->node.lock ()) + // The remote end may have closed the connection before this side timing out, in which case the remote address is no longer available. + boost::system::error_code ec_remote_l; + boost::asio::ip::tcp::endpoint remote_endpoint_l = this_l->tcp_socket.remote_endpoint (ec_remote_l); + if (!ec_remote_l) { - if (node_l->config.logging.network_timeout_logging ()) - { - // The remote end may have closed the connection before this side timing out, in which case the remote address is no longer available. - boost::system::error_code ec_remote_l; - boost::asio::ip::tcp::endpoint remote_endpoint_l = this_l->tcp_socket.remote_endpoint (ec_remote_l); - if (!ec_remote_l) - { - node_l->logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % remote_endpoint_l)); - } - } - this_l->timed_out = true; - this_l->close (); + this_l->node.logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % remote_endpoint_l)); } } - else if (!this_l->closed) - { - this_l->checkup (); - } + this_l->timed_out = true; + this_l->close (); } - }); - } + else if (!this_l->closed) + { + this_l->checkup (); + } + } + }); } bool nano::socket::has_timed_out () const @@ -275,12 +254,9 @@ void nano::socket::flush_send_queue_callbacks () auto & item = send_queue.front (); if (item.callback) { - if (auto node_l = node.lock ()) - { - node_l->background ([callback = std::move (item.callback)]() { - callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - }); - } + node.background ([callback = std::move (item.callback)]() { + callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); + }); } send_queue.pop_front (); } @@ -300,11 +276,8 @@ void nano::socket::close_internal () flush_send_queue_callbacks (); if (ec) { - if (auto node_l = node.lock ()) - { - node_l->logger.try_log ("Failed to close socket gracefully: ", ec.message ()); - node_l->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close); - } + node.logger.try_log ("Failed to close socket gracefully: ", ec.message ()); + node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close); } } } @@ -324,8 +297,8 @@ size_t nano::socket::get_max_write_queue_size () const return queue_size_max; } -nano::server_socket::server_socket (std::shared_ptr node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, nano::socket::concurrency concurrency_a) : -socket (node_a, std::chrono::seconds::max (), concurrency_a), acceptor (node_a->io_ctx), local (local_a), max_inbound_connections (max_connections_a), concurrency_new_connections (concurrency_a) +nano::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, nano::socket::concurrency concurrency_a) : +socket (node_a, std::chrono::seconds::max (), concurrency_a), acceptor (node_a.io_ctx), local (local_a), max_inbound_connections (max_connections_a), concurrency_new_connections (concurrency_a) { } @@ -363,52 +336,49 @@ void nano::server_socket::on_connection (std::function (shared_from_this ())); boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, callback_a]() { - if (auto node_l = this_l->node.lock ()) + if (this_l->acceptor.is_open ()) { - if (this_l->acceptor.is_open ()) + if (this_l->connections.size () < this_l->max_inbound_connections) { // Prepare new connection - auto new_connection (std::make_shared (node_l->shared (), boost::none, this_l->concurrency_new_connections)); + auto new_connection (std::make_shared (this_l->node, boost::none, this_l->concurrency_new_connections)); this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, boost::asio::bind_executor (this_l->strand, [this_l, new_connection, callback_a](boost::system::error_code const & ec_a) { this_l->evict_dead_connections (); - if (auto node_l = this_l->node.lock ()) + if (this_l->connections.size () < this_l->max_inbound_connections) { - if (this_l->connections.size () < this_l->max_inbound_connections) + if (!ec_a) { - if (!ec_a) - { - // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start - // an IO operation immediately, which will start a timer. - new_connection->checkup (); - new_connection->start_timer (node_l->network_params.network.is_dev_network () ? std::chrono::seconds (2) : node_l->network_params.node.idle_timeout); - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); - this_l->connections.push_back (new_connection); - } - else - { - node_l->logger.try_log ("Unable to accept connection: ", ec_a.message ()); - } - - // If the callback returns true, keep accepting new connections - if (callback_a (new_connection, ec_a)) - { - this_l->on_connection (callback_a); - } - else - { - node_l->logger.try_log ("Stopping to accept connections"); - } + // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start + // an IO operation immediately, which will start a timer. + new_connection->checkup (); + new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.node.idle_timeout); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); + this_l->connections.push_back (new_connection); } else { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); - boost::asio::post (this_l->strand, boost::asio::bind_executor (this_l->strand, [this_l, callback_a]() { - this_l->on_connection (callback_a); - })); + this_l->node.logger.try_log ("Unable to accept connection: ", ec_a.message ()); } - }; + + // If the callback returns true, keep accepting new connections + if (callback_a (new_connection, ec_a)) + { + this_l->on_connection (callback_a); + } + else + { + this_l->node.logger.try_log ("Stopping to accept connections"); + } + } + else + { + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + boost::asio::post (this_l->strand, boost::asio::bind_executor (this_l->strand, [this_l, callback_a]() { + this_l->on_connection (callback_a); + })); + } })); } } diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index 838d162c9..f94bc81bc 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -51,7 +51,7 @@ public: * @param io_timeout If tcp async operation is not completed within the timeout, the socket is closed. If not set, the tcp_io_timeout config option is used. * @param concurrency write concurrency */ - explicit socket (std::shared_ptr node, boost::optional io_timeout = boost::none, concurrency = concurrency::single_writer); + explicit socket (nano::node & node, boost::optional io_timeout = boost::none, concurrency = concurrency::single_writer); virtual ~socket (); void async_connect (boost::asio::ip::tcp::endpoint const &, std::function); void async_read (std::shared_ptr>, size_t, std::function); @@ -80,7 +80,7 @@ protected: boost::asio::strand strand; boost::asio::ip::tcp::socket tcp_socket; - std::weak_ptr node; + nano::node & node; /** The other end of the connection */ boost::asio::ip::tcp::endpoint remote; @@ -116,7 +116,7 @@ public: * @param max_connections_a Maximum number of concurrent connections * @param concurrency_a Write concurrency for new connections */ - explicit server_socket (std::shared_ptr node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, concurrency concurrency_a = concurrency::single_writer); + explicit server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, concurrency concurrency_a = concurrency::single_writer); /**Start accepting new connections */ void start (boost::system::error_code &); /** Stop accepting new connections */ diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 8313fa2d1..2356b3bc0 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -531,7 +531,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a node.network.tcp_channels.udp_fallback (endpoint_a, callback_a); return; } - auto socket (std::make_shared (node.shared_from_this (), boost::none, nano::socket::concurrency::multi_writer)); + auto socket (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); std::weak_ptr socket_w (socket); auto channel (std::make_shared (node, socket_w)); std::weak_ptr node_w (node.shared ());