From 62bdaba67c36b81ac6c88c6d875e9b47300dc09d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20W=C3=B3jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Mon, 3 Apr 2023 15:19:08 +0200 Subject: [PATCH] Socket write queue fixes and improvements (#4202) Fixes a socket data error where partial simultaneous writes can be interleaved corrupting the connection. Adds ability to prioritise non-bootstrap traffic --- nano/core_test/socket.cpp | 28 +-- nano/node/bandwidth_limiter.cpp | 15 ++ nano/node/bandwidth_limiter.hpp | 3 + nano/node/bootstrap/bootstrap_ascending.cpp | 4 +- nano/node/bootstrap/bootstrap_server.cpp | 6 +- nano/node/transport/channel.cpp | 6 +- nano/node/transport/channel.hpp | 18 +- nano/node/transport/fake.cpp | 6 +- nano/node/transport/fake.hpp | 10 +- nano/node/transport/inproc.cpp | 2 +- nano/node/transport/inproc.hpp | 5 +- nano/node/transport/socket.cpp | 224 +++++++++++++++----- nano/node/transport/socket.hpp | 79 ++++--- nano/node/transport/tcp.cpp | 7 +- nano/node/transport/tcp.hpp | 12 +- nano/node/transport/traffic_type.hpp | 14 ++ 16 files changed, 317 insertions(+), 122 deletions(-) create mode 100644 nano/node/transport/traffic_type.hpp diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index a34af37b0..bb5aade8c 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -65,9 +65,9 @@ TEST (socket, max_connections) return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); }; - ASSERT_TIMELY (5s, get_tcp_accept_failures () == 1); - ASSERT_TIMELY (5s, get_tcp_accept_successes () == 2); - ASSERT_TIMELY (5s, connection_attempts == 3); + ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 1); + ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 2); + ASSERT_TIMELY_EQ (5s, connection_attempts, 3); // create space for one socket and fill the connections table again @@ -79,9 +79,9 @@ TEST (socket, max_connections) auto client5 = std::make_shared (*node); client5->async_connect (dst_endpoint, connect_handler); - ASSERT_TIMELY (5s, get_tcp_accept_failures () == 2); - ASSERT_TIMELY (5s, get_tcp_accept_successes () == 3); - ASSERT_TIMELY (5s, connection_attempts == 5); + ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 2); + ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 3); + ASSERT_TIMELY_EQ (5s, connection_attempts, 5); // close all existing sockets and fill the connections table again // start counting form 1 because 0 is the already closed socket @@ -99,10 +99,10 @@ TEST (socket, max_connections) auto client8 = std::make_shared (*node); client8->async_connect (dst_endpoint, connect_handler); - ASSERT_TIMELY (5s, get_tcp_accept_failures () == 3); - ASSERT_TIMELY (5s, get_tcp_accept_successes () == 5); - ASSERT_TIMELY (5s, connection_attempts == 8); // connections initiated by the client - ASSERT_TIMELY (5s, server_sockets.size () == 5); // connections accepted by the server + ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 3); + ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 5); + ASSERT_TIMELY_EQ (5s, connection_attempts, 8); // connections initiated by the client + ASSERT_TIMELY_EQ (5s, server_sockets.size (), 5); // connections accepted by the server node->stop (); } @@ -459,11 +459,11 @@ TEST (socket, drop_policy) // We're going to write twice the queue size + 1, and the server isn't reading // The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop) - func (nano::transport::socket::queue_size_max * 2 + 1, nano::transport::buffer_drop_policy::no_socket_drop); + func (nano::transport::socket::default_max_queue_size * 2 + 1, nano::transport::buffer_drop_policy::no_socket_drop); ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out)); ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out)); - func (nano::transport::socket::queue_size_max + 1, nano::transport::buffer_drop_policy::limiter); + func (nano::transport::socket::default_max_queue_size + 1, nano::transport::buffer_drop_policy::limiter); // The stats are accumulated from before ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out)); ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out)); @@ -716,7 +716,7 @@ TEST (socket_timeout, write) // create a client socket and send lots of data to fill the socket queue on the local and remote side // eventually, the all tcp queues should fill up and async_write will not be able to progress // and the timeout should kick in and close the socket, which will cause the async_write to return an error - auto socket = std::make_shared (*node); + auto socket = std::make_shared (*node, 1024 * 64); // socket with a max queue size much larger than OS buffers std::atomic done = false; boost::system::error_code ec; socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) { @@ -826,7 +826,7 @@ TEST (socket_timeout, write_overlapped) // create a client socket and send lots of data to fill the socket queue on the local and remote side // eventually, the all tcp queues should fill up and async_write will not be able to progress // and the timeout should kick in and close the socket, which will cause the async_write to return an error - auto socket = std::make_shared (*node); + auto socket = std::make_shared (*node, 1024 * 64); // socket with a max queue size much larger than OS buffers std::atomic done = false; boost::system::error_code ec; socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) { diff --git a/nano/node/bandwidth_limiter.cpp b/nano/node/bandwidth_limiter.cpp index 9487f49a5..344cb6a12 100644 --- a/nano/node/bandwidth_limiter.cpp +++ b/nano/node/bandwidth_limiter.cpp @@ -56,4 +56,19 @@ void nano::outbound_bandwidth_limiter::reset (std::size_t limit, double burst_ra { auto & limiter = select_limiter (type); limiter.reset (limit, burst_ratio); +} + +nano::bandwidth_limit_type nano::to_bandwidth_limit_type (const nano::transport::traffic_type & traffic_type) +{ + switch (traffic_type) + { + case nano::transport::traffic_type::generic: + return nano::bandwidth_limit_type::standard; + break; + case nano::transport::traffic_type::bootstrap: + return nano::bandwidth_limit_type::bootstrap; + break; + } + debug_assert (false); + return {}; } \ No newline at end of file diff --git a/nano/node/bandwidth_limiter.hpp b/nano/node/bandwidth_limiter.hpp index 5a7f1f764..5cb13f0eb 100644 --- a/nano/node/bandwidth_limiter.hpp +++ b/nano/node/bandwidth_limiter.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include namespace nano { @@ -15,6 +16,8 @@ enum class bandwidth_limit_type bootstrap }; +nano::bandwidth_limit_type to_bandwidth_limit_type (nano::transport::traffic_type const &); + /** * Class that tracks and manages bandwidth limits for IO operations */ diff --git a/nano/node/bootstrap/bootstrap_ascending.cpp b/nano/node/bootstrap/bootstrap_ascending.cpp index ecc3629dc..954b96ce2 100644 --- a/nano/node/bootstrap/bootstrap_ascending.cpp +++ b/nano/node/bootstrap/bootstrap_ascending.cpp @@ -456,7 +456,7 @@ void nano::bootstrap_ascending::send (std::shared_ptr // TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests channel->send ( request, nullptr, - nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap); + nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); } size_t nano::bootstrap_ascending::priority_size () const @@ -562,7 +562,7 @@ std::shared_ptr nano::bootstrap_ascending::available_c auto channels = network.random_set (32, node.network_params.network.bootstrap_protocol_version_min, /* include temporary channels */ true); for (auto & channel : channels) { - if (!channel->max ()) + if (!channel->max (nano::transport::traffic_type::bootstrap)) { return channel; } diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 3735a18b9..7e0e67438 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -81,7 +81,7 @@ bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::s // If channel is full our response will be dropped anyway, so filter that early // TODO: Add per channel limits (this ideally should be done on the channel message processing side) - if (channel->max ()) + if (channel->max (nano::transport::traffic_type::bootstrap)) { stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in); return false; @@ -125,7 +125,7 @@ void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::write_error, nano::stat::dir::out); } }, - nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap); + nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap); } /* @@ -138,7 +138,7 @@ void nano::bootstrap_server::process_batch (std::deque & batch) for (auto & [request, channel] : batch) { - if (!channel->max ()) + if (!channel->max (nano::transport::traffic_type::bootstrap)) { auto response = process (transaction, request); respond (response, channel); diff --git a/nano/node/transport/channel.cpp b/nano/node/transport/channel.cpp index 9d7dcf2da..39037b734 100644 --- a/nano/node/transport/channel.cpp +++ b/nano/node/transport/channel.cpp @@ -14,15 +14,15 @@ nano::transport::channel::channel (nano::node & node_a) : set_network_version (node_a.network_params.network.protocol_version); } -void nano::transport::channel::send (nano::message & message_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::bandwidth_limit_type limiter_type) +void nano::transport::channel::send (nano::message & message_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) { auto buffer (message_a.to_shared_const_buffer ()); auto detail = nano::to_stat_detail (message_a.header.type); auto is_droppable_by_limiter = (drop_policy_a == nano::transport::buffer_drop_policy::limiter); - auto should_pass (node.outbound_limiter.should_pass (buffer.size (), limiter_type)); + auto should_pass (node.outbound_limiter.should_pass (buffer.size (), to_bandwidth_limit_type (traffic_type))); if (!is_droppable_by_limiter || should_pass) { - send_buffer (buffer, callback_a, drop_policy_a); + send_buffer (buffer, callback_a, drop_policy_a, traffic_type); node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out); } else diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index 4973e6f8f..33c13ae53 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -27,15 +27,25 @@ public: virtual std::size_t hash_code () const = 0; virtual bool operator== (nano::transport::channel const &) const = 0; - void send (nano::message & message_a, std::function const & callback_a = nullptr, nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type = nano::bandwidth_limit_type::standard); + + void send (nano::message & message_a, + std::function const & callback_a = nullptr, + nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter, + nano::transport::traffic_type = nano::transport::traffic_type::generic); + // TODO: investigate clang-tidy warning about default parameters on virtual/override functions - // - virtual void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter) = 0; + virtual void send_buffer (nano::shared_const_buffer const &, + std::function const & = nullptr, + nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, + nano::transport::traffic_type = nano::transport::traffic_type::generic) + = 0; + virtual std::string to_string () const = 0; virtual nano::endpoint get_endpoint () const = 0; virtual nano::tcp_endpoint get_tcp_endpoint () const = 0; virtual nano::transport::transport_type get_type () const = 0; - virtual bool max () + + virtual bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic) { return false; } diff --git a/nano/node/transport/fake.cpp b/nano/node/transport/fake.cpp index 2e5cc67f0..02d705ee3 100644 --- a/nano/node/transport/fake.cpp +++ b/nano/node/transport/fake.cpp @@ -13,10 +13,10 @@ nano::transport::fake::channel::channel (nano::node & node) : /** * The send function behaves like a null device, it throws the data away and returns success. -*/ -void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a) + */ +void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) { - //auto bytes = buffer_a.to_bytes (); + // auto bytes = buffer_a.to_bytes (); auto size = buffer_a.size (); if (callback_a) { diff --git a/nano/node/transport/fake.hpp b/nano/node/transport/fake.hpp index d03b14a46..809c5b98a 100644 --- a/nano/node/transport/fake.hpp +++ b/nano/node/transport/fake.hpp @@ -20,13 +20,11 @@ namespace transport std::string to_string () const override; std::size_t hash_code () const override; - // clang-format off void send_buffer ( - nano::shared_const_buffer const &, - std::function const & = nullptr, - nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter - ) override; - // clang-format on + nano::shared_const_buffer const &, + std::function const & = nullptr, + nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, + nano::transport::traffic_type = nano::transport::traffic_type::generic) override; bool operator== (nano::transport::channel const &) const override; bool operator== (nano::transport::fake::channel const & other_a) const; diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index 4cb229aea..a015a2fdb 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -53,7 +53,7 @@ public: * Send the buffer to the peer and call the callback function when done. The call never fails. * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. */ -void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a) +void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) { std::size_t offset{ 0 }; auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a) { diff --git a/nano/node/transport/inproc.hpp b/nano/node/transport/inproc.hpp index 443995ecb..c6012bc1a 100644 --- a/nano/node/transport/inproc.hpp +++ b/nano/node/transport/inproc.hpp @@ -18,9 +18,10 @@ namespace transport explicit channel (nano::node & node, nano::node & destination); std::size_t hash_code () const override; bool operator== (nano::transport::channel const &) const override; + // TODO: investigate clang-tidy warning about default parameters on virtual/override functions - // - void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter) override; + void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; + std::string to_string () const override; bool operator== (nano::transport::inproc::channel const & other_a) const { diff --git a/nano/node/transport/socket.cpp b/nano/node/transport/socket.cpp index 6eef31be3..629fe3b5f 100644 --- a/nano/node/transport/socket.cpp +++ b/nano/node/transport/socket.cpp @@ -33,7 +33,12 @@ bool is_temporary_error (boost::system::error_code const & ec_a) } } -nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_type_a) : +/* + * socket + */ + +nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_type_a, std::size_t max_queue_size_a) : + send_queue{ max_queue_size_a }, strand{ node_a.io_ctx.get_executor () }, tcp_socket{ node_a.io_ctx }, node{ node_a }, @@ -42,7 +47,8 @@ nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_t last_completion_time_or_init{ nano::seconds_since_epoch () }, last_receive_time_or_init{ nano::seconds_since_epoch () }, default_timeout{ node_a.config.tcp_io_timeout }, - silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time } + silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time }, + max_queue_size{ max_queue_size_a } { } @@ -51,12 +57,17 @@ nano::transport::socket::~socket () close_internal (); } +void nano::transport::socket::start () +{ + ongoing_checkup (); +} + void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function callback_a) { debug_assert (callback_a); debug_assert (endpoint_type () == endpoint_type_t::client); - checkup (); + start (); auto this_l (shared_from_this ()); set_default_timeout (); @@ -116,7 +127,7 @@ void nano::transport::socket::async_read (std::shared_ptr> } } -void nano::transport::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) +void nano::transport::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a, nano::transport::traffic_type traffic_type) { if (closed) { @@ -126,49 +137,83 @@ void nano::transport::socket::async_write (nano::shared_const_buffer const & buf callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); }); } - return; } - ++queue_size; - - boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback = std::move (callback_a), this_l = shared_from_this ()] () mutable { - if (this_l->closed) + bool queued = send_queue.insert (buffer_a, callback_a, traffic_type); + if (!queued) + { + if (callback_a) { - if (callback) - { + node.background ([callback = std::move (callback_a)] () { callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - } + }); + } + return; + } - return; + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () { + if (!this_s->write_in_progress) + { + this_s->write_queued_messages (); + } + })); +} + +// Must be called from strand +void nano::transport::socket::write_queued_messages () +{ + if (closed) + { + return; + } + + auto next = send_queue.pop (); + if (!next) + { + return; + } + + set_default_timeout (); + + write_in_progress = true; + nano::async_write (tcp_socket, next->buffer, + boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) { + this_s->write_in_progress = false; + + if (ec) + { + this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); + this_s->close (); + } + else + { + this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size); + this_s->set_last_completion (); } - this_l->set_default_timeout (); + if (next->callback) + { + next->callback (ec, size); + } - nano::async_write (this_l->tcp_socket, buffer_a, - boost::asio::bind_executor (this_l->strand, - [buffer_a, cbk = std::move (callback), this_l] (boost::system::error_code ec, std::size_t size_a) { - --this_l->queue_size; - - if (ec) - { - this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); - this_l->close (); - } - else - { - this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); - this_l->set_last_completion (); - } - - if (cbk) - { - cbk (ec, size_a); - } - })); + if (!ec) + { + this_s->write_queued_messages (); + } })); } +bool nano::transport::socket::max (nano::transport::traffic_type traffic_type) const +{ + return send_queue.size (traffic_type) >= max_queue_size; +} + +bool nano::transport::socket::full (nano::transport::traffic_type traffic_type) const +{ + return send_queue.size (traffic_type) >= 2 * max_queue_size; +} + /** Call set_timeout with default_timeout as parameter */ void nano::transport::socket::set_default_timeout () { @@ -196,7 +241,7 @@ void nano::transport::socket::set_last_receive_time () last_receive_time_or_init = nano::seconds_since_epoch (); } -void nano::transport::socket::checkup () +void nano::transport::socket::ongoing_checkup () { std::weak_ptr this_w (shared_from_this ()); node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [this_w] () { @@ -238,7 +283,7 @@ void nano::transport::socket::checkup () } else if (!this_l->closed) { - this_l->checkup (); + this_l->ongoing_checkup (); } } }); @@ -289,19 +334,24 @@ void nano::transport::socket::close () // This must be called from a strand or the destructor void nano::transport::socket::close_internal () { - if (!closed.exchange (true)) + if (closed.exchange (true)) { - default_timeout = std::chrono::seconds (0); - boost::system::error_code ec; + return; + } - // Ignore error code for shutdown as it is best-effort - tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); - tcp_socket.close (ec); - if (ec) - { - node.logger.try_log ("Failed to close socket gracefully: ", ec.message ()); - node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close); - } + send_queue.clear (); + + default_timeout = std::chrono::seconds (0); + boost::system::error_code ec; + + // Ignore error code for shutdown as it is best-effort + tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); + tcp_socket.close (ec); + + if (ec) + { + node.logger.try_log ("Failed to close socket gracefully: ", ec.message ()); + node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close); } } @@ -315,6 +365,82 @@ nano::tcp_endpoint nano::transport::socket::local_endpoint () const return tcp_socket.local_endpoint (); } +/* + * write_queue + */ + +nano::transport::socket::write_queue::write_queue (std::size_t max_size_a) : + max_size{ max_size_a } +{ +} + +bool nano::transport::socket::write_queue::insert (const buffer_t & buffer, callback_t callback, nano::transport::traffic_type traffic_type) +{ + nano::lock_guard guard{ mutex }; + if (queues[traffic_type].size () < 2 * max_size) + { + queues[traffic_type].push (entry{ buffer, callback }); + return true; // Queued + } + return false; // Not queued +} + +std::optional nano::transport::socket::write_queue::pop () +{ + nano::lock_guard guard{ mutex }; + + auto try_pop = [this] (nano::transport::traffic_type type) -> std::optional { + auto & que = queues[type]; + if (!que.empty ()) + { + auto item = que.front (); + que.pop (); + return item; + } + return std::nullopt; + }; + + // TODO: This is a very basic prioritization, implement something more advanced and configurable + if (auto item = try_pop (nano::transport::traffic_type::generic)) + { + return item; + } + if (auto item = try_pop (nano::transport::traffic_type::bootstrap)) + { + return item; + } + + return std::nullopt; +} + +void nano::transport::socket::write_queue::clear () +{ + nano::lock_guard guard{ mutex }; + queues.clear (); +} + +std::size_t nano::transport::socket::write_queue::size (nano::transport::traffic_type traffic_type) const +{ + nano::lock_guard guard{ mutex }; + if (auto it = queues.find (traffic_type); it != queues.end ()) + { + return it->second.size (); + } + return 0; +} + +bool nano::transport::socket::write_queue::empty () const +{ + nano::lock_guard guard{ mutex }; + return std::all_of (queues.begin (), queues.end (), [] (auto const & que) { + return que.second.empty (); + }); +} + +/* + * server_socket + */ + nano::transport::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, std::size_t max_connections_a) : socket{ node_a, endpoint_type_t::server }, acceptor{ node_a.io_ctx }, @@ -474,7 +600,7 @@ void nano::transport::server_socket::on_connection (std::functioncheckup (); + new_connection->start (); new_connection->set_timeout (this_l->node.network_params.network.idle_timeout); this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection); diff --git a/nano/node/transport/socket.hpp b/nano/node/transport/socket.hpp index 7d3fa9d52..d0afec196 100644 --- a/nano/node/transport/socket.hpp +++ b/nano/node/transport/socket.hpp @@ -3,11 +3,16 @@ #include #include #include +#include #include +#include #include #include #include +#include +#include +#include #include namespace boost::asio::ip @@ -43,6 +48,8 @@ class socket : public std::enable_shared_from_this friend class tcp_channels; public: + static std::size_t constexpr default_max_queue_size = 128; + enum class type_t { undefined, @@ -62,12 +69,14 @@ public: * @param node Owning node * @param endpoint_type_a The endpoint's type: either server or client */ - explicit socket (nano::node & node, endpoint_type_t endpoint_type_a); + explicit socket (nano::node & node, endpoint_type_t endpoint_type_a, std::size_t max_queue_size = default_max_queue_size); virtual ~socket (); + void start (); + void async_connect (boost::asio::ip::tcp::endpoint const &, std::function); void async_read (std::shared_ptr> const &, std::size_t, std::function); - void async_write (nano::shared_const_buffer const &, std::function = {}); + void async_write (nano::shared_const_buffer const &, std::function callback = {}, nano::transport::traffic_type = nano::transport::traffic_type::generic); virtual void close (); boost::asio::ip::tcp::endpoint remote_endpoint () const; @@ -79,14 +88,10 @@ public: std::chrono::seconds get_default_timeout_value () const; void set_timeout (std::chrono::seconds); void set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a); - bool max () const - { - return queue_size >= queue_size_max; - } - bool full () const - { - return queue_size >= queue_size_max * 2; - } + + bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic) const; + bool full (nano::transport::traffic_type = nano::transport::traffic_type::generic) const; + type_t type () const { return type_m; @@ -116,15 +121,38 @@ public: return !closed && tcp_socket.is_open (); } -protected: - /** Holds the buffer and callback for queued writes */ - class queue_item +private: + class write_queue { public: - nano::shared_const_buffer buffer; - std::function callback; + using buffer_t = nano::shared_const_buffer; + using callback_t = std::function; + + struct entry + { + buffer_t buffer; + callback_t callback; + }; + + public: + explicit write_queue (std::size_t max_size); + + bool insert (buffer_t const &, callback_t, nano::transport::traffic_type); + std::optional pop (); + void clear (); + std::size_t size (nano::transport::traffic_type) const; + bool empty () const; + + std::size_t const max_size; + + private: + mutable nano::mutex mutex; + std::unordered_map> queues; }; + write_queue send_queue; + +protected: boost::asio::strand strand; boost::asio::ip::tcp::socket tcp_socket; nano::node & node; @@ -158,22 +186,19 @@ protected: /** used in real time server sockets, number of seconds of no receive traffic that will cause the socket to timeout */ std::chrono::seconds silent_connection_tolerance_time; - /** Tracks number of blocks queued for delivery to the local socket send buffers. - * Under normal circumstances, this should be zero. - * Note that this is not the number of buffers queued to the peer, it is the number of buffers - * queued up to enter the local TCP send buffer - * socket buffer queue -> TCP send queue -> (network) -> TCP receive queue of peer - */ - std::atomic queue_size{ 0 }; - /** Set by close() - completion handlers must check this. This is more reliable than checking error codes as the OS may have already completed the async operation. */ std::atomic closed{ false }; + + /** Updated only from strand, but stored as atomic so it can be read from outside */ + std::atomic write_in_progress{ false }; + void close_internal (); + void write_queued_messages (); void set_default_timeout (); void set_last_completion (); void set_last_receive_time (); - void checkup (); + void ongoing_checkup (); void read_impl (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a); private: @@ -181,7 +206,7 @@ private: endpoint_type_t endpoint_type_m; public: - static std::size_t constexpr queue_size_max = 128; + std::size_t const max_queue_size; }; std::string socket_type_to_string (socket::type_t type); @@ -238,8 +263,8 @@ public: * Constructor * @param node_a Owning node */ - explicit client_socket (nano::node & node_a) : - socket{ node_a, endpoint_type_t::client } + explicit client_socket (nano::node & node_a, std::size_t max_queue_size = default_max_queue_size) : + socket{ node_a, endpoint_type_t::client, max_queue_size } { } }; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index db828cd5a..c62bf3dbe 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -46,11 +46,11 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const & return result; } -void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy policy_a) +void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy policy_a, nano::transport::traffic_type traffic_type) { if (auto socket_l = socket.lock ()) { - if (!socket_l->max () || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full ())) + if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type))) { socket_l->async_write ( buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr (node.shared ()), callback_a] (boost::system::error_code const & ec, std::size_t size_a) { @@ -69,7 +69,8 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const callback_a (ec, size_a); } } - }); + }, + traffic_type); } else { diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 41f44eca5..80196a308 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -37,11 +37,13 @@ namespace transport public: channel_tcp (nano::node &, std::weak_ptr); ~channel_tcp () override; + std::size_t hash_code () const override; bool operator== (nano::transport::channel const &) const override; - // TODO: investigate clang-tidy warning about default parameters on virtual/override functions - // - void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter) override; + + // TODO: investigate clang-tidy warning about default parameters on virtual/override functions// + void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; + std::string to_string () const override; bool operator== (nano::transport::channel_tcp const & other_a) const { @@ -70,12 +72,12 @@ namespace transport return nano::transport::transport_type::tcp; } - virtual bool max () override + virtual bool max (nano::transport::traffic_type traffic_type) override { bool result = true; if (auto socket_l = socket.lock ()) { - result = socket_l->max (); + result = socket_l->max (traffic_type); } return result; } diff --git a/nano/node/transport/traffic_type.hpp b/nano/node/transport/traffic_type.hpp new file mode 100644 index 000000000..a4b89e1c7 --- /dev/null +++ b/nano/node/transport/traffic_type.hpp @@ -0,0 +1,14 @@ +#pragma once + +namespace nano::transport +{ +/** + * Used for message prioritization and bandwidth limits + */ +enum class traffic_type +{ + generic, + /** For bootstrap (asc_pull_ack, asc_pull_req) traffic */ + bootstrap +}; +} \ No newline at end of file