diff --git a/nano/lib/async.hpp b/nano/lib/async.hpp index 818d6bcaf..c7a28a856 100644 --- a/nano/lib/async.hpp +++ b/nano/lib/async.hpp @@ -4,6 +4,7 @@ #include +#include #include namespace asio = boost::asio; @@ -21,6 +22,12 @@ inline asio::awaitable sleep_for (auto duration) debug_assert (!ec || ec == asio::error::operation_aborted); } +inline asio::awaitable cancelled () +{ + auto state = co_await asio::this_coro::cancellation_state; + co_return state.cancelled () != asio::cancellation_type::none; +} + /** * A cancellation signal that can be emitted from any thread. * It follows the same semantics as asio::cancellation_signal. @@ -40,7 +47,6 @@ public: { // Can only move if the strands are the same debug_assert (strand == other.strand); - if (this != &other) { signal = std::move (other.signal); @@ -70,6 +76,106 @@ private: bool slotted{ false }; // For debugging purposes }; +class condition +{ +public: + explicit condition (nano::async::strand & strand) : + strand{ strand }, + state{ std::make_shared (strand) } + { + } + + condition (condition &&) = default; + + condition & operator= (condition && other) + { + // Can only move if the strands are the same + debug_assert (strand == other.strand); + if (this != &other) + { + state = std::move (other.state); + } + return *this; + } + + void notify () + { + // Avoid unnecessary dispatch if already scheduled + release_assert (state); + if (state->scheduled.exchange (true) == false) + { + asio::dispatch (strand, [state_s = state] () { + state_s->scheduled = false; + state_s->timer.cancel (); + }); + } + } + + // Spuriously wakes up + asio::awaitable wait () + { + debug_assert (strand.running_in_this_thread ()); + co_await wait_for (std::chrono::seconds{ 1 }); + } + + asio::awaitable wait_for (auto duration) + { + debug_assert (strand.running_in_this_thread ()); + release_assert (state); + state->timer.expires_after (duration); + boost::system::error_code ec; // Swallow error from cancellation + co_await state->timer.async_wait (asio::redirect_error (asio::use_awaitable, ec)); + debug_assert (!ec || ec == asio::error::operation_aborted); + } + + void cancel () + { + release_assert (state); + asio::dispatch (strand, [state_s = state] () { + state_s->scheduled = false; + state_s->timer.cancel (); + }); + } + + bool valid () const + { + return state != nullptr; + } + + nano::async::strand & strand; + +private: + struct shared_state + { + asio::steady_timer timer; + std::atomic scheduled{ false }; + + explicit shared_state (nano::async::strand & strand) : + timer{ strand } {}; + }; + std::shared_ptr state; +}; + +// Concept for awaitables +template +concept async_task = std::same_as>; + +// Concept for callables that return an awaitable +template +concept async_callable = requires (T t) { + { + t () + } -> std::same_as>; +}; + +// Concept for tasks that take a condition and return an awaitable +template +concept async_callable_with_condition = requires (T t, condition & c) { + { + t (c) + } -> std::same_as>; +}; + /** * Wrapper with convenience functions and safety checks for asynchronous tasks. * Aims to provide interface similar to std::thread. @@ -86,13 +192,28 @@ public: { } - task (nano::async::strand & strand, auto && func) : + template + requires async_task || async_callable + task (nano::async::strand & strand, Func && func) : strand{ strand }, cancellation{ strand } { future = asio::co_spawn ( strand, - std::forward (func), + std::forward (func), + asio::bind_cancellation_slot (cancellation.slot (), asio::use_future)); + } + + template + task (nano::async::strand & strand, Func && func) : + strand{ strand }, + cancellation{ strand }, + condition{ std::make_unique (strand) } + { + auto awaitable_func = func (*condition); + future = asio::co_spawn ( + strand, + func (*condition), asio::bind_cancellation_slot (cancellation.slot (), asio::use_future)); } @@ -107,11 +228,11 @@ public: { // Can only move if the strands are the same debug_assert (strand == other.strand); - if (this != &other) { future = std::move (other.future); cancellation = std::move (other.cancellation); + condition = std::move (other.condition); } return *this; } @@ -139,6 +260,18 @@ public: { debug_assert (joinable ()); cancellation.emit (); + if (condition) + { + condition->cancel (); + } + } + + void notify () + { + if (condition) + { + condition->notify (); + } } nano::async::strand & strand; @@ -146,5 +279,6 @@ public: private: std::future future; nano::async::cancellation cancellation; + std::unique_ptr condition; }; } \ No newline at end of file diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 31facd4af..5527c3eed 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -30,6 +30,11 @@ enum class type ipc, tcp, tcp_server, + tcp_channel, + tcp_channel_queued, + tcp_channel_sent, + tcp_channel_drop, + tcp_channel_ec, tcp_channels, tcp_channels_rejected, tcp_channels_purge, diff --git a/nano/node/node.cpp b/nano/node/node.cpp index de9a8b1a1..a540fd547 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -705,7 +705,7 @@ void nano::node::stop () epoch_upgrader.stop (); local_block_broadcaster.stop (); message_processor.stop (); - network.stop (); // Stop network last to avoid killing in-use sockets + network.stop (); monitor.stop (); bootstrap_workers.stop (); diff --git a/nano/node/transport/channel.cpp b/nano/node/transport/channel.cpp index 7a0a9ce40..12bd75387 100644 --- a/nano/node/transport/channel.cpp +++ b/nano/node/transport/channel.cpp @@ -14,33 +14,12 @@ nano::transport::channel::channel (nano::node & node_a) : set_network_version (node_a.network_params.network.protocol_version); } -void nano::transport::channel::send (nano::message & message_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) +bool nano::transport::channel::send (nano::message const & message, std::function const & callback, nano::transport::buffer_drop_policy drop_policy, nano::transport::traffic_type traffic_type) { - auto buffer = message_a.to_shared_const_buffer (); - - bool is_droppable_by_limiter = (drop_policy_a == nano::transport::buffer_drop_policy::limiter); - bool should_pass = node.outbound_limiter.should_pass (buffer.size (), traffic_type); - bool pass = !is_droppable_by_limiter || should_pass; - - node.stats.inc (pass ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message_a.type ()), nano::stat::dir::out, /* aggregate all */ true); - node.logger.trace (nano::log::type::channel_sent, to_log_detail (message_a.type ()), - nano::log::arg{ "message", message_a }, - nano::log::arg{ "channel", *this }, - nano::log::arg{ "dropped", !pass }); - - if (pass) - { - send_buffer (buffer, callback_a, drop_policy_a, traffic_type); - } - else - { - if (callback_a) - { - node.io_ctx.post ([callback_a] () { - callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - }); - } - } + auto buffer = message.to_shared_const_buffer (); + bool sent = send_buffer (buffer, callback, drop_policy, traffic_type); + node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true); + return sent; } void nano::transport::channel::set_peering_endpoint (nano::endpoint endpoint) diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index 8f860f58c..622c01f6a 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -22,18 +22,24 @@ enum class transport_type : uint8_t class channel { +public: + using callback_t = std::function; + public: explicit channel (nano::node &); virtual ~channel () = default; - void send (nano::message & message_a, - std::function const & callback_a = nullptr, - nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter, + /// @returns true if the message was sent (or queued to be sent), false if it was immediately dropped + bool send (nano::message const &, + callback_t const & callback = nullptr, + nano::transport::buffer_drop_policy policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic); - // TODO: investigate clang-tidy warning about default parameters on virtual/override functions - virtual void send_buffer (nano::shared_const_buffer const &, - std::function const & = nullptr, + /// Implements the actual send operation + /// @returns true if the message was sent (or queued to be sent), false if it was immediately dropped + // TODO: Make this private, do not allow external direct calls + virtual bool send_buffer (nano::shared_const_buffer const &, + callback_t const & callback = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) = 0; diff --git a/nano/node/transport/fake.cpp b/nano/node/transport/fake.cpp index e99bc69f2..5124b7906 100644 --- a/nano/node/transport/fake.cpp +++ b/nano/node/transport/fake.cpp @@ -14,7 +14,7 @@ nano::transport::fake::channel::channel (nano::node & node) : /** * The send function behaves like a null device, it throws the data away and returns success. */ -void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) +bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) { // auto bytes = buffer_a.to_bytes (); auto size = buffer_a.size (); @@ -24,6 +24,7 @@ void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer cons callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size); }); } + return true; } std::string nano::transport::fake::channel::to_string () const diff --git a/nano/node/transport/fake.hpp b/nano/node/transport/fake.hpp index d9ce585cb..1d9d0e1b5 100644 --- a/nano/node/transport/fake.hpp +++ b/nano/node/transport/fake.hpp @@ -19,7 +19,7 @@ namespace transport std::string to_string () const override; - void send_buffer ( + bool send_buffer ( nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index 1c9f4c462..b55c957f3 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -18,7 +18,7 @@ nano::transport::inproc::channel::channel (nano::node & node, nano::node & desti * Send the buffer to the peer and call the callback function when done. The call never fails. * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. */ -void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) +bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) { std::size_t offset{ 0 }; auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a) { @@ -54,6 +54,8 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size); }); } + + return true; } std::string nano::transport::inproc::channel::to_string () const diff --git a/nano/node/transport/inproc.hpp b/nano/node/transport/inproc.hpp index d93fbed2d..fcd5006f4 100644 --- a/nano/node/transport/inproc.hpp +++ b/nano/node/transport/inproc.hpp @@ -18,7 +18,7 @@ namespace transport explicit channel (nano::node & node, nano::node & destination); // TODO: investigate clang-tidy warning about default parameters on virtual/override functions - void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; + bool send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; std::string to_string () const override; diff --git a/nano/node/transport/tcp_channel.cpp b/nano/node/transport/tcp_channel.cpp index e596ec995..3c678fe70 100644 --- a/nano/node/transport/tcp_channel.cpp +++ b/nano/node/transport/tcp_channel.cpp @@ -1,89 +1,262 @@ +#include #include #include #include #include #include +#include /* * tcp_channel */ -nano::transport::tcp_channel::tcp_channel (nano::node & node_a, std::weak_ptr socket_a) : +nano::transport::tcp_channel::tcp_channel (nano::node & node_a, std::shared_ptr socket_a) : channel (node_a), - socket (std::move (socket_a)) + socket{ socket_a }, + strand{ node_a.io_ctx.get_executor () }, + sending_task{ strand } { + stacktrace = nano::generate_stacktrace (); + remote_endpoint = socket_a->remote_endpoint (); + local_endpoint = socket_a->local_endpoint (); + start (); } nano::transport::tcp_channel::~tcp_channel () { + close (); + debug_assert (!sending_task.joinable ()); +} + +void nano::transport::tcp_channel::close () +{ + stop (); + if (auto socket_l = socket.lock ()) { socket_l->close (); } + + closed = true; } -void nano::transport::tcp_channel::update_endpoints () +void nano::transport::tcp_channel::start () { - nano::lock_guard lock{ mutex }; + sending_task = nano::async::task (strand, [this] (nano::async::condition & condition) -> asio::awaitable { + try + { + co_await run_sending (condition); + } + catch (boost::system::system_error const & ex) + { + // Operation aborted is expected when cancelling the acceptor + debug_assert (ex.code () == asio::error::operation_aborted); + } + debug_assert (strand.running_in_this_thread ()); + }); +} - debug_assert (remote_endpoint == nano::endpoint{}); // Not initialized endpoint value - debug_assert (local_endpoint == nano::endpoint{}); // Not initialized endpoint value - - if (auto socket_l = socket.lock ()) +void nano::transport::tcp_channel::stop () +{ + if (sending_task.joinable ()) { - remote_endpoint = socket_l->remote_endpoint (); - local_endpoint = socket_l->local_endpoint (); + // Node context must be running to gracefully stop async tasks + debug_assert (!node.io_ctx.stopped ()); + // Ensure that we are not trying to await the task while running on the same thread / io_context + debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ()); + sending_task.cancel (); + sending_task.join (); } } -void nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::transport::buffer_drop_policy policy_a, nano::transport::traffic_type traffic_type) +bool nano::transport::tcp_channel::max (nano::transport::traffic_type traffic_type) { - if (auto socket_l = socket.lock ()) + nano::lock_guard guard{ mutex }; + return queue.max (traffic_type); +} + +bool nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer, std::function const & callback, nano::transport::buffer_drop_policy policy, nano::transport::traffic_type traffic_type) +{ + nano::unique_lock lock{ mutex }; + if (!queue.max (traffic_type) || (policy == buffer_drop_policy::no_socket_drop && !queue.full (traffic_type))) { - if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type))) + queue.push (traffic_type, { buffer, callback }); + lock.unlock (); + node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (traffic_type), nano::stat::dir::out); + sending_task.notify (); + return true; + } + else + { + node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (traffic_type), nano::stat::dir::out); + } + return false; + + // if (!socket->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket->full (traffic_type))) + // { + // socket->async_write ( + // buffer_a, [this_s = shared_from_this (), endpoint_a = socket->remote_endpoint (), node = std::weak_ptr{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) { + // if (auto node_l = node.lock ()) + // { + // if (!ec) + // { + // this_s->set_last_packet_sent (std::chrono::steady_clock::now ()); + // } + // if (ec == boost::system::errc::host_unreachable) + // { + // node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); + // } + // if (callback_a) + // { + // callback_a (ec, size_a); + // } + // } + // }, + // traffic_type); + // } + // else + // { + // if (policy_a == nano::transport::buffer_drop_policy::no_socket_drop) + // { + // node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); + // } + // else + // { + // node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); + // } + // if (callback_a) + // { + // callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); + // } + // } +} + +asio::awaitable nano::transport::tcp_channel::run_sending (nano::async::condition & condition) +{ + while (!co_await nano::async::cancelled ()) + { + debug_assert (strand.running_in_this_thread ()); + + auto next_batch = [this] () { + const size_t max_batch = 8; // TODO: Make this configurable + nano::lock_guard lock{ mutex }; + return queue.next_batch (max_batch); + }; + + if (auto batch = next_batch (); !batch.empty ()) { - socket_l->async_write ( - buffer_a, [this_s = shared_from_this (), endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) { - if (auto node_l = node.lock ()) - { - if (!ec) - { - this_s->set_last_packet_sent (std::chrono::steady_clock::now ()); - } - if (ec == boost::system::errc::host_unreachable) - { - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); - } - if (callback_a) - { - callback_a (ec, size_a); - } - } - }, - traffic_type); + for (auto const & [type, item] : batch) + { + co_await send_one (type, item); + } } else { - if (policy_a == nano::transport::buffer_drop_policy::no_socket_drop) - { - node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); - } - else - { - node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); - } - if (callback_a) - { - callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); - } + co_await condition.wait (); } } - else if (callback_a) +} + +asio::awaitable nano::transport::tcp_channel::send_one (traffic_type type, tcp_channel_queue::entry_t const & item) +{ + debug_assert (strand.running_in_this_thread ()); + + auto const & [buffer, callback] = item; + + co_await wait_socket (type); + co_await wait_bandwidth (type, buffer.size ()); + + // TODO: Use shared_ptr to store the socket to avoid this + auto socket_l = socket.lock (); + if (!socket_l) { - node.io_ctx.post ([callback_a] () { - callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - }); + if (callback) + { + callback (boost::asio::error::operation_aborted, 0); + } + co_return; } + + node.stats.inc (nano::stat::type::tcp_channel_sent, to_stat_detail (type), nano::stat::dir::out); + + socket_l->async_write ( + buffer, + [this_w = weak_from_this (), callback] (boost::system::error_code const & ec, std::size_t size) { + if (auto this_l = this_w.lock ()) + { + this_l->node.stats.inc (nano::stat::type::tcp_channel_ec, nano::to_stat_detail (ec), nano::stat::dir::out); + if (!ec) + { + this_l->set_last_packet_sent (std::chrono::steady_clock::now ()); + } + } + if (callback) + { + callback (ec, size); + } + }); +} + +asio::awaitable nano::transport::tcp_channel::wait_bandwidth (nano::transport::traffic_type type, size_t size) +{ + debug_assert (strand.running_in_this_thread ()); + + auto allocate_bandwidth = [this] (auto type, auto size) -> asio::awaitable { + // TODO: Consider implementing a subsribe/notification mechanism for bandwidth allocation + while (!node.outbound_limiter.should_pass (size, type)) + { + co_await nano::async::sleep_for (100ms); + } + co_return size; + }; + + // This is somewhat inefficient + // The performance impact *should* be mitigated by the fact that we allocate it in larger chunks, so this happens relatively infrequently + const size_t bandwidth_chunk = 128 * 1024; // TODO: Make this configurable + while (allocated_bandwidth < size) + { + allocated_bandwidth += co_await allocate_bandwidth (type, bandwidth_chunk); + } + allocated_bandwidth -= size; +} + +asio::awaitable nano::transport::tcp_channel::wait_socket (nano::transport::traffic_type type) +{ + debug_assert (strand.running_in_this_thread ()); + + auto should_wait = [this, type] () { + if (auto socket_l = socket.lock ()) + { + return socket_l->full (type); + } + return false; // Abort if the socket is dead + }; + + while (should_wait ()) + { + co_await nano::async::sleep_for (100ms); + } +} + +bool nano::transport::tcp_channel::alive () const +{ + if (auto socket_l = socket.lock ()) + { + return socket_l->alive (); + } + return false; +} + +nano::endpoint nano::transport::tcp_channel::get_remote_endpoint () const +{ + nano::lock_guard lock{ mutex }; + return remote_endpoint; +} + +nano::endpoint nano::transport::tcp_channel::get_local_endpoint () const +{ + nano::lock_guard lock{ mutex }; + return local_endpoint; } std::string nano::transport::tcp_channel::to_string () const @@ -94,6 +267,133 @@ std::string nano::transport::tcp_channel::to_string () const void nano::transport::tcp_channel::operator() (nano::object_stream & obs) const { nano::transport::channel::operator() (obs); // Write common data - obs.write ("socket", socket); } + +/* + * tcp_channel_queue + */ + +nano::transport::tcp_channel_queue::tcp_channel_queue () +{ + for (auto type : all_traffic_types ()) + { + queues.at (type) = { type, {} }; + } +} + +bool nano::transport::tcp_channel_queue::empty () const +{ + return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) { + return queue.second.empty (); + }); +} + +size_t nano::transport::tcp_channel_queue::size () const +{ + return std::accumulate (queues.begin (), queues.end (), size_t{ 0 }, [] (size_t acc, auto const & queue) { + return acc + queue.second.size (); + }); +} + +size_t nano::transport::tcp_channel_queue::size (traffic_type type) const +{ + return queues.at (type).second.size (); +} + +bool nano::transport::tcp_channel_queue::max (traffic_type type) const +{ + return size (type) >= max_size; +} + +bool nano::transport::tcp_channel_queue::full (traffic_type type) const +{ + return size (type) >= max_size * 2; +} + +void nano::transport::tcp_channel_queue::push (traffic_type type, entry_t entry) +{ + debug_assert (!full (type)); // Should be checked before calling this function + queues.at (type).second.push_back (entry); +} + +auto nano::transport::tcp_channel_queue::next () -> value_t +{ + debug_assert (!empty ()); // Should be checked before calling next + + auto should_seek = [&, this] () { + if (current == queues.end ()) + { + return true; + } + auto & queue = current->second; + if (queue.empty ()) + { + return true; + } + // Allow up to `priority` requests to be processed before moving to the next queue + if (counter >= priority (current->first)) + { + return true; + } + return false; + }; + + if (should_seek ()) + { + seek_next (); + } + + release_assert (current != queues.end ()); + + auto & source = current->first; + auto & queue = current->second; + + ++counter; + + release_assert (!queue.empty ()); + auto entry = queue.front (); + queue.pop_front (); + return { source, entry }; +} + +auto nano::transport::tcp_channel_queue::next_batch (size_t max_count) -> batch_t +{ + // TODO: Naive implementation, could be optimized + std::deque result; + while (!empty () && result.size () < max_count) + { + result.emplace_back (next ()); + } + return result; +} + +size_t nano::transport::tcp_channel_queue::priority (traffic_type type) const +{ + switch (type) + { + case traffic_type::generic: + return 1; + case traffic_type::bootstrap: + return 1; + } + debug_assert (false); + return 1; +} + +void nano::transport::tcp_channel_queue::seek_next () +{ + counter = 0; + do + { + if (current != queues.end ()) + { + ++current; + } + if (current == queues.end ()) + { + current = queues.begin (); + } + release_assert (current != queues.end ()); + } while (current->second.empty ()); +} diff --git a/nano/node/transport/tcp_channel.hpp b/nano/node/transport/tcp_channel.hpp index 3936e3e9a..082f73741 100644 --- a/nano/node/transport/tcp_channel.hpp +++ b/nano/node/transport/tcp_channel.hpp @@ -1,72 +1,82 @@ #pragma once +#include +#include #include +#include #include namespace nano::transport { -class tcp_server; -class tcp_channels; -class tcp_channel; +class tcp_channel_queue final +{ +public: + explicit tcp_channel_queue (); -class tcp_channel : public nano::transport::channel, public std::enable_shared_from_this + using callback_t = std::function; + using entry_t = std::pair; + using value_t = std::pair; + using batch_t = std::deque; + + bool empty () const; + size_t size () const; + size_t size (traffic_type) const; + void push (traffic_type, entry_t); + value_t next (); + batch_t next_batch (size_t max_count); + + bool max (traffic_type) const; + bool full (traffic_type) const; + + constexpr static size_t max_size = 128; + +private: + void seek_next (); + size_t priority (traffic_type) const; + + using queue_t = std::pair>; + nano::enum_array queues{}; + nano::enum_array::iterator current{ queues.end () }; + size_t counter{ 0 }; +}; + +class tcp_channel final : public nano::transport::channel, public std::enable_shared_from_this { friend class nano::transport::tcp_channels; public: - tcp_channel (nano::node &, std::weak_ptr); + tcp_channel (nano::node &, std::shared_ptr); ~tcp_channel () override; - void update_endpoints (); + void close () override; - // TODO: investigate clang-tidy warning about default parameters on virtual/override functions// - void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; + bool send_buffer (nano::shared_const_buffer const &, + nano::transport::channel::callback_t const & callback = nullptr, + nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, + nano::transport::traffic_type = nano::transport::traffic_type::generic) + override; - std::string to_string () const override; + bool max (nano::transport::traffic_type traffic_type) override; + bool alive () const override; - nano::endpoint get_remote_endpoint () const override - { - nano::lock_guard lock{ mutex }; - return remote_endpoint; - } - - nano::endpoint get_local_endpoint () const override - { - nano::lock_guard lock{ mutex }; - return local_endpoint; - } + nano::endpoint get_remote_endpoint () const override; + nano::endpoint get_local_endpoint () const override; nano::transport::transport_type get_type () const override { return nano::transport::transport_type::tcp; } - bool max (nano::transport::traffic_type traffic_type) override - { - bool result = true; - if (auto socket_l = socket.lock ()) - { - result = socket_l->max (traffic_type); - } - return result; - } + std::string to_string () const override; - bool alive () const override - { - if (auto socket_l = socket.lock ()) - { - return socket_l->alive (); - } - return false; - } +private: + void start (); + void stop (); - void close () override - { - if (auto socket_l = socket.lock ()) - { - socket_l->close (); - } - } + asio::awaitable run_sending (nano::async::condition &); + asio::awaitable send_one (traffic_type, tcp_channel_queue::entry_t const &); + asio::awaitable wait_bandwidth (traffic_type, size_t size); + asio::awaitable wait_socket (traffic_type); public: std::weak_ptr socket; @@ -75,7 +85,19 @@ private: nano::endpoint remote_endpoint; nano::endpoint local_endpoint; + nano::async::strand strand; + nano::async::task sending_task; + + mutable nano::mutex mutex; + tcp_channel_queue queue; + + std::atomic allocated_bandwidth{ 0 }; + + // Debugging + std::atomic closed{ false }; + std::string stacktrace; + public: // Logging void operator() (nano::object_stream &) const override; }; -} \ No newline at end of file +} diff --git a/nano/node/transport/tcp_channels.cpp b/nano/node/transport/tcp_channels.cpp index 0abb052ea..f92b22ae2 100644 --- a/nano/node/transport/tcp_channels.cpp +++ b/nano/node/transport/tcp_channels.cpp @@ -112,7 +112,6 @@ std::shared_ptr nano::transport::tcp_channels::cre node_id.to_node_id ()); auto channel = std::make_shared (node, socket); - channel->update_endpoints (); channel->set_node_id (node_id); attempts.get ().erase (endpoint); diff --git a/nano/node/transport/tcp_channels.hpp b/nano/node/transport/tcp_channels.hpp index 815625b17..352b85911 100644 --- a/nano/node/transport/tcp_channels.hpp +++ b/nano/node/transport/tcp_channels.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include diff --git a/nano/node/transport/traffic_type.cpp b/nano/node/transport/traffic_type.cpp index bfb12a657..9d170c713 100644 --- a/nano/node/transport/traffic_type.cpp +++ b/nano/node/transport/traffic_type.cpp @@ -1,6 +1,19 @@ #include +#include #include +#include + +std::string_view nano::transport::to_string (nano::transport::traffic_type type) +{ + return nano::enum_util::name (type); +} + +std::vector nano::transport::all_traffic_types () +{ + return nano::enum_util::values (); +} + nano::stat::detail nano::transport::to_stat_detail (nano::transport::traffic_type type) { return nano::enum_util::cast (type); diff --git a/nano/node/transport/traffic_type.hpp b/nano/node/transport/traffic_type.hpp index 1f0914cb0..89128b119 100644 --- a/nano/node/transport/traffic_type.hpp +++ b/nano/node/transport/traffic_type.hpp @@ -2,6 +2,9 @@ #include +#include +#include + namespace nano::transport { /** @@ -13,5 +16,7 @@ enum class traffic_type bootstrap, // Ascending bootstrap (asc_pull_ack, asc_pull_req) traffic }; +std::string_view to_string (traffic_type); +std::vector all_traffic_types (); nano::stat::detail to_stat_detail (traffic_type); } \ No newline at end of file