Channel traffic rework

This commit is contained in:
Piotr Wójcik 2024-05-04 21:17:08 +02:00
commit 98fe46dd0c
15 changed files with 603 additions and 136 deletions

View file

@ -4,6 +4,7 @@
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <functional>
#include <future> #include <future>
namespace asio = boost::asio; namespace asio = boost::asio;
@ -21,6 +22,12 @@ inline asio::awaitable<void> sleep_for (auto duration)
debug_assert (!ec || ec == asio::error::operation_aborted); debug_assert (!ec || ec == asio::error::operation_aborted);
} }
inline asio::awaitable<bool> cancelled ()
{
auto state = co_await asio::this_coro::cancellation_state;
co_return state.cancelled () != asio::cancellation_type::none;
}
/** /**
* A cancellation signal that can be emitted from any thread. * A cancellation signal that can be emitted from any thread.
* It follows the same semantics as asio::cancellation_signal. * It follows the same semantics as asio::cancellation_signal.
@ -40,7 +47,6 @@ public:
{ {
// Can only move if the strands are the same // Can only move if the strands are the same
debug_assert (strand == other.strand); debug_assert (strand == other.strand);
if (this != &other) if (this != &other)
{ {
signal = std::move (other.signal); signal = std::move (other.signal);
@ -70,6 +76,106 @@ private:
bool slotted{ false }; // For debugging purposes bool slotted{ false }; // For debugging purposes
}; };
class condition
{
public:
explicit condition (nano::async::strand & strand) :
strand{ strand },
state{ std::make_shared<shared_state> (strand) }
{
}
condition (condition &&) = default;
condition & operator= (condition && other)
{
// Can only move if the strands are the same
debug_assert (strand == other.strand);
if (this != &other)
{
state = std::move (other.state);
}
return *this;
}
void notify ()
{
// Avoid unnecessary dispatch if already scheduled
release_assert (state);
if (state->scheduled.exchange (true) == false)
{
asio::dispatch (strand, [state_s = state] () {
state_s->scheduled = false;
state_s->timer.cancel ();
});
}
}
// Spuriously wakes up
asio::awaitable<void> wait ()
{
debug_assert (strand.running_in_this_thread ());
co_await wait_for (std::chrono::seconds{ 1 });
}
asio::awaitable<void> wait_for (auto duration)
{
debug_assert (strand.running_in_this_thread ());
release_assert (state);
state->timer.expires_after (duration);
boost::system::error_code ec; // Swallow error from cancellation
co_await state->timer.async_wait (asio::redirect_error (asio::use_awaitable, ec));
debug_assert (!ec || ec == asio::error::operation_aborted);
}
void cancel ()
{
release_assert (state);
asio::dispatch (strand, [state_s = state] () {
state_s->scheduled = false;
state_s->timer.cancel ();
});
}
bool valid () const
{
return state != nullptr;
}
nano::async::strand & strand;
private:
struct shared_state
{
asio::steady_timer timer;
std::atomic<bool> scheduled{ false };
explicit shared_state (nano::async::strand & strand) :
timer{ strand } {};
};
std::shared_ptr<shared_state> state;
};
// Concept for awaitables
template <typename T>
concept async_task = std::same_as<T, asio::awaitable<void>>;
// Concept for callables that return an awaitable
template <typename T>
concept async_callable = requires (T t) {
{
t ()
} -> std::same_as<asio::awaitable<void>>;
};
// Concept for tasks that take a condition and return an awaitable
template <typename T>
concept async_callable_with_condition = requires (T t, condition & c) {
{
t (c)
} -> std::same_as<asio::awaitable<void>>;
};
/** /**
* Wrapper with convenience functions and safety checks for asynchronous tasks. * Wrapper with convenience functions and safety checks for asynchronous tasks.
* Aims to provide interface similar to std::thread. * Aims to provide interface similar to std::thread.
@ -86,13 +192,28 @@ public:
{ {
} }
task (nano::async::strand & strand, auto && func) : template <typename Func>
requires async_task<Func> || async_callable<Func>
task (nano::async::strand & strand, Func && func) :
strand{ strand }, strand{ strand },
cancellation{ strand } cancellation{ strand }
{ {
future = asio::co_spawn ( future = asio::co_spawn (
strand, strand,
std::forward<decltype (func)> (func), std::forward<Func> (func),
asio::bind_cancellation_slot (cancellation.slot (), asio::use_future));
}
template <async_callable_with_condition Func>
task (nano::async::strand & strand, Func && func) :
strand{ strand },
cancellation{ strand },
condition{ std::make_unique<nano::async::condition> (strand) }
{
auto awaitable_func = func (*condition);
future = asio::co_spawn (
strand,
func (*condition),
asio::bind_cancellation_slot (cancellation.slot (), asio::use_future)); asio::bind_cancellation_slot (cancellation.slot (), asio::use_future));
} }
@ -107,11 +228,11 @@ public:
{ {
// Can only move if the strands are the same // Can only move if the strands are the same
debug_assert (strand == other.strand); debug_assert (strand == other.strand);
if (this != &other) if (this != &other)
{ {
future = std::move (other.future); future = std::move (other.future);
cancellation = std::move (other.cancellation); cancellation = std::move (other.cancellation);
condition = std::move (other.condition);
} }
return *this; return *this;
} }
@ -139,6 +260,18 @@ public:
{ {
debug_assert (joinable ()); debug_assert (joinable ());
cancellation.emit (); cancellation.emit ();
if (condition)
{
condition->cancel ();
}
}
void notify ()
{
if (condition)
{
condition->notify ();
}
} }
nano::async::strand & strand; nano::async::strand & strand;
@ -146,5 +279,6 @@ public:
private: private:
std::future<value_type> future; std::future<value_type> future;
nano::async::cancellation cancellation; nano::async::cancellation cancellation;
std::unique_ptr<nano::async::condition> condition;
}; };
} }

View file

@ -30,6 +30,11 @@ enum class type
ipc, ipc,
tcp, tcp,
tcp_server, tcp_server,
tcp_channel,
tcp_channel_queued,
tcp_channel_sent,
tcp_channel_drop,
tcp_channel_ec,
tcp_channels, tcp_channels,
tcp_channels_rejected, tcp_channels_rejected,
tcp_channels_purge, tcp_channels_purge,

View file

@ -705,7 +705,7 @@ void nano::node::stop ()
epoch_upgrader.stop (); epoch_upgrader.stop ();
local_block_broadcaster.stop (); local_block_broadcaster.stop ();
message_processor.stop (); message_processor.stop ();
network.stop (); // Stop network last to avoid killing in-use sockets network.stop ();
monitor.stop (); monitor.stop ();
bootstrap_workers.stop (); bootstrap_workers.stop ();

View file

@ -14,33 +14,12 @@ nano::transport::channel::channel (nano::node & node_a) :
set_network_version (node_a.network_params.network.protocol_version); set_network_version (node_a.network_params.network.protocol_version);
} }
void nano::transport::channel::send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) bool nano::transport::channel::send (nano::message const & message, std::function<void (boost::system::error_code const &, std::size_t)> const & callback, nano::transport::buffer_drop_policy drop_policy, nano::transport::traffic_type traffic_type)
{ {
auto buffer = message_a.to_shared_const_buffer (); auto buffer = message.to_shared_const_buffer ();
bool sent = send_buffer (buffer, callback, drop_policy, traffic_type);
bool is_droppable_by_limiter = (drop_policy_a == nano::transport::buffer_drop_policy::limiter); node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true);
bool should_pass = node.outbound_limiter.should_pass (buffer.size (), traffic_type); return sent;
bool pass = !is_droppable_by_limiter || should_pass;
node.stats.inc (pass ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message_a.type ()), nano::stat::dir::out, /* aggregate all */ true);
node.logger.trace (nano::log::type::channel_sent, to_log_detail (message_a.type ()),
nano::log::arg{ "message", message_a },
nano::log::arg{ "channel", *this },
nano::log::arg{ "dropped", !pass });
if (pass)
{
send_buffer (buffer, callback_a, drop_policy_a, traffic_type);
}
else
{
if (callback_a)
{
node.io_ctx.post ([callback_a] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
}
} }
void nano::transport::channel::set_peering_endpoint (nano::endpoint endpoint) void nano::transport::channel::set_peering_endpoint (nano::endpoint endpoint)

View file

@ -22,18 +22,24 @@ enum class transport_type : uint8_t
class channel class channel
{ {
public:
using callback_t = std::function<void (boost::system::error_code const &, std::size_t)>;
public: public:
explicit channel (nano::node &); explicit channel (nano::node &);
virtual ~channel () = default; virtual ~channel () = default;
void send (nano::message & message_a, /// @returns true if the message was sent (or queued to be sent), false if it was immediately dropped
std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a = nullptr, bool send (nano::message const &,
nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter, callback_t const & callback = nullptr,
nano::transport::buffer_drop_policy policy = nano::transport::buffer_drop_policy::limiter,
nano::transport::traffic_type = nano::transport::traffic_type::generic); nano::transport::traffic_type = nano::transport::traffic_type::generic);
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions /// Implements the actual send operation
virtual void send_buffer (nano::shared_const_buffer const &, /// @returns true if the message was sent (or queued to be sent), false if it was immediately dropped
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, // TODO: Make this private, do not allow external direct calls
virtual bool send_buffer (nano::shared_const_buffer const &,
callback_t const & callback = nullptr,
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
nano::transport::traffic_type = nano::transport::traffic_type::generic) nano::transport::traffic_type = nano::transport::traffic_type::generic)
= 0; = 0;

View file

@ -14,7 +14,7 @@ nano::transport::fake::channel::channel (nano::node & node) :
/** /**
* The send function behaves like a null device, it throws the data away and returns success. * The send function behaves like a null device, it throws the data away and returns success.
*/ */
void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
{ {
// auto bytes = buffer_a.to_bytes (); // auto bytes = buffer_a.to_bytes ();
auto size = buffer_a.size (); auto size = buffer_a.size ();
@ -24,6 +24,7 @@ void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer cons
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size); callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size);
}); });
} }
return true;
} }
std::string nano::transport::fake::channel::to_string () const std::string nano::transport::fake::channel::to_string () const

View file

@ -19,7 +19,7 @@ namespace transport
std::string to_string () const override; std::string to_string () const override;
void send_buffer ( bool send_buffer (
nano::shared_const_buffer const &, nano::shared_const_buffer const &,
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,

View file

@ -18,7 +18,7 @@ nano::transport::inproc::channel::channel (nano::node & node, nano::node & desti
* Send the buffer to the peer and call the callback function when done. The call never fails. * Send the buffer to the peer and call the callback function when done. The call never fails.
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
*/ */
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type) bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
{ {
std::size_t offset{ 0 }; std::size_t offset{ 0 };
auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) { auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
@ -54,6 +54,8 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co
callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size); callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size);
}); });
} }
return true;
} }
std::string nano::transport::inproc::channel::to_string () const std::string nano::transport::inproc::channel::to_string () const

View file

@ -18,7 +18,7 @@ namespace transport
explicit channel (nano::node & node, nano::node & destination); explicit channel (nano::node & node, nano::node & destination);
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions // TODO: investigate clang-tidy warning about default parameters on virtual/override functions
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; bool send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override;
std::string to_string () const override; std::string to_string () const override;

View file

@ -1,89 +1,262 @@
#include <nano/lib/stacktrace.hpp>
#include <nano/lib/stats.hpp> #include <nano/lib/stats.hpp>
#include <nano/lib/utility.hpp> #include <nano/lib/utility.hpp>
#include <nano/node/node.hpp> #include <nano/node/node.hpp>
#include <nano/node/transport/message_deserializer.hpp> #include <nano/node/transport/message_deserializer.hpp>
#include <nano/node/transport/tcp_channel.hpp> #include <nano/node/transport/tcp_channel.hpp>
#include <nano/node/transport/transport.hpp>
/* /*
* tcp_channel * tcp_channel
*/ */
nano::transport::tcp_channel::tcp_channel (nano::node & node_a, std::weak_ptr<nano::transport::tcp_socket> socket_a) : nano::transport::tcp_channel::tcp_channel (nano::node & node_a, std::shared_ptr<nano::transport::tcp_socket> socket_a) :
channel (node_a), channel (node_a),
socket (std::move (socket_a)) socket{ socket_a },
strand{ node_a.io_ctx.get_executor () },
sending_task{ strand }
{ {
stacktrace = nano::generate_stacktrace ();
remote_endpoint = socket_a->remote_endpoint ();
local_endpoint = socket_a->local_endpoint ();
start ();
} }
nano::transport::tcp_channel::~tcp_channel () nano::transport::tcp_channel::~tcp_channel ()
{ {
close ();
debug_assert (!sending_task.joinable ());
}
void nano::transport::tcp_channel::close ()
{
stop ();
if (auto socket_l = socket.lock ()) if (auto socket_l = socket.lock ())
{ {
socket_l->close (); socket_l->close ();
} }
closed = true;
} }
void nano::transport::tcp_channel::update_endpoints () void nano::transport::tcp_channel::start ()
{ {
nano::lock_guard<nano::mutex> lock{ mutex }; sending_task = nano::async::task (strand, [this] (nano::async::condition & condition) -> asio::awaitable<void> {
try
{
co_await run_sending (condition);
}
catch (boost::system::system_error const & ex)
{
// Operation aborted is expected when cancelling the acceptor
debug_assert (ex.code () == asio::error::operation_aborted);
}
debug_assert (strand.running_in_this_thread ());
});
}
debug_assert (remote_endpoint == nano::endpoint{}); // Not initialized endpoint value void nano::transport::tcp_channel::stop ()
debug_assert (local_endpoint == nano::endpoint{}); // Not initialized endpoint value {
if (sending_task.joinable ())
if (auto socket_l = socket.lock ())
{ {
remote_endpoint = socket_l->remote_endpoint (); // Node context must be running to gracefully stop async tasks
local_endpoint = socket_l->local_endpoint (); debug_assert (!node.io_ctx.stopped ());
// Ensure that we are not trying to await the task while running on the same thread / io_context
debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ());
sending_task.cancel ();
sending_task.join ();
} }
} }
void nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy policy_a, nano::transport::traffic_type traffic_type) bool nano::transport::tcp_channel::max (nano::transport::traffic_type traffic_type)
{ {
if (auto socket_l = socket.lock ()) nano::lock_guard<nano::mutex> guard{ mutex };
return queue.max (traffic_type);
}
bool nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer, std::function<void (boost::system::error_code const &, std::size_t)> const & callback, nano::transport::buffer_drop_policy policy, nano::transport::traffic_type traffic_type)
{
nano::unique_lock<nano::mutex> lock{ mutex };
if (!queue.max (traffic_type) || (policy == buffer_drop_policy::no_socket_drop && !queue.full (traffic_type)))
{ {
if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type))) queue.push (traffic_type, { buffer, callback });
lock.unlock ();
node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (traffic_type), nano::stat::dir::out);
sending_task.notify ();
return true;
}
else
{
node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (traffic_type), nano::stat::dir::out);
}
return false;
// if (!socket->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket->full (traffic_type)))
// {
// socket->async_write (
// buffer_a, [this_s = shared_from_this (), endpoint_a = socket->remote_endpoint (), node = std::weak_ptr<nano::node>{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
// if (auto node_l = node.lock ())
// {
// if (!ec)
// {
// this_s->set_last_packet_sent (std::chrono::steady_clock::now ());
// }
// if (ec == boost::system::errc::host_unreachable)
// {
// node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
// }
// if (callback_a)
// {
// callback_a (ec, size_a);
// }
// }
// },
// traffic_type);
// }
// else
// {
// if (policy_a == nano::transport::buffer_drop_policy::no_socket_drop)
// {
// node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
// }
// else
// {
// node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
// }
// if (callback_a)
// {
// callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0);
// }
// }
}
asio::awaitable<void> nano::transport::tcp_channel::run_sending (nano::async::condition & condition)
{
while (!co_await nano::async::cancelled ())
{
debug_assert (strand.running_in_this_thread ());
auto next_batch = [this] () {
const size_t max_batch = 8; // TODO: Make this configurable
nano::lock_guard<nano::mutex> lock{ mutex };
return queue.next_batch (max_batch);
};
if (auto batch = next_batch (); !batch.empty ())
{ {
socket_l->async_write ( for (auto const & [type, item] : batch)
buffer_a, [this_s = shared_from_this (), endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr<nano::node>{ node.shared () }, callback_a] (boost::system::error_code const & ec, std::size_t size_a) { {
if (auto node_l = node.lock ()) co_await send_one (type, item);
{ }
if (!ec)
{
this_s->set_last_packet_sent (std::chrono::steady_clock::now ());
}
if (ec == boost::system::errc::host_unreachable)
{
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
}
if (callback_a)
{
callback_a (ec, size_a);
}
}
},
traffic_type);
} }
else else
{ {
if (policy_a == nano::transport::buffer_drop_policy::no_socket_drop) co_await condition.wait ();
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
}
else
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
}
if (callback_a)
{
callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0);
}
} }
} }
else if (callback_a) }
asio::awaitable<void> nano::transport::tcp_channel::send_one (traffic_type type, tcp_channel_queue::entry_t const & item)
{
debug_assert (strand.running_in_this_thread ());
auto const & [buffer, callback] = item;
co_await wait_socket (type);
co_await wait_bandwidth (type, buffer.size ());
// TODO: Use shared_ptr to store the socket to avoid this
auto socket_l = socket.lock ();
if (!socket_l)
{ {
node.io_ctx.post ([callback_a] () { if (callback)
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); {
}); callback (boost::asio::error::operation_aborted, 0);
}
co_return;
} }
node.stats.inc (nano::stat::type::tcp_channel_sent, to_stat_detail (type), nano::stat::dir::out);
socket_l->async_write (
buffer,
[this_w = weak_from_this (), callback] (boost::system::error_code const & ec, std::size_t size) {
if (auto this_l = this_w.lock ())
{
this_l->node.stats.inc (nano::stat::type::tcp_channel_ec, nano::to_stat_detail (ec), nano::stat::dir::out);
if (!ec)
{
this_l->set_last_packet_sent (std::chrono::steady_clock::now ());
}
}
if (callback)
{
callback (ec, size);
}
});
}
asio::awaitable<void> nano::transport::tcp_channel::wait_bandwidth (nano::transport::traffic_type type, size_t size)
{
debug_assert (strand.running_in_this_thread ());
auto allocate_bandwidth = [this] (auto type, auto size) -> asio::awaitable<size_t> {
// TODO: Consider implementing a subsribe/notification mechanism for bandwidth allocation
while (!node.outbound_limiter.should_pass (size, type))
{
co_await nano::async::sleep_for (100ms);
}
co_return size;
};
// This is somewhat inefficient
// The performance impact *should* be mitigated by the fact that we allocate it in larger chunks, so this happens relatively infrequently
const size_t bandwidth_chunk = 128 * 1024; // TODO: Make this configurable
while (allocated_bandwidth < size)
{
allocated_bandwidth += co_await allocate_bandwidth (type, bandwidth_chunk);
}
allocated_bandwidth -= size;
}
asio::awaitable<void> nano::transport::tcp_channel::wait_socket (nano::transport::traffic_type type)
{
debug_assert (strand.running_in_this_thread ());
auto should_wait = [this, type] () {
if (auto socket_l = socket.lock ())
{
return socket_l->full (type);
}
return false; // Abort if the socket is dead
};
while (should_wait ())
{
co_await nano::async::sleep_for (100ms);
}
}
bool nano::transport::tcp_channel::alive () const
{
if (auto socket_l = socket.lock ())
{
return socket_l->alive ();
}
return false;
}
nano::endpoint nano::transport::tcp_channel::get_remote_endpoint () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return remote_endpoint;
}
nano::endpoint nano::transport::tcp_channel::get_local_endpoint () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return local_endpoint;
} }
std::string nano::transport::tcp_channel::to_string () const std::string nano::transport::tcp_channel::to_string () const
@ -94,6 +267,133 @@ std::string nano::transport::tcp_channel::to_string () const
void nano::transport::tcp_channel::operator() (nano::object_stream & obs) const void nano::transport::tcp_channel::operator() (nano::object_stream & obs) const
{ {
nano::transport::channel::operator() (obs); // Write common data nano::transport::channel::operator() (obs); // Write common data
obs.write ("socket", socket); obs.write ("socket", socket);
} }
/*
* tcp_channel_queue
*/
nano::transport::tcp_channel_queue::tcp_channel_queue ()
{
for (auto type : all_traffic_types ())
{
queues.at (type) = { type, {} };
}
}
bool nano::transport::tcp_channel_queue::empty () const
{
return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) {
return queue.second.empty ();
});
}
size_t nano::transport::tcp_channel_queue::size () const
{
return std::accumulate (queues.begin (), queues.end (), size_t{ 0 }, [] (size_t acc, auto const & queue) {
return acc + queue.second.size ();
});
}
size_t nano::transport::tcp_channel_queue::size (traffic_type type) const
{
return queues.at (type).second.size ();
}
bool nano::transport::tcp_channel_queue::max (traffic_type type) const
{
return size (type) >= max_size;
}
bool nano::transport::tcp_channel_queue::full (traffic_type type) const
{
return size (type) >= max_size * 2;
}
void nano::transport::tcp_channel_queue::push (traffic_type type, entry_t entry)
{
debug_assert (!full (type)); // Should be checked before calling this function
queues.at (type).second.push_back (entry);
}
auto nano::transport::tcp_channel_queue::next () -> value_t
{
debug_assert (!empty ()); // Should be checked before calling next
auto should_seek = [&, this] () {
if (current == queues.end ())
{
return true;
}
auto & queue = current->second;
if (queue.empty ())
{
return true;
}
// Allow up to `priority` requests to be processed before moving to the next queue
if (counter >= priority (current->first))
{
return true;
}
return false;
};
if (should_seek ())
{
seek_next ();
}
release_assert (current != queues.end ());
auto & source = current->first;
auto & queue = current->second;
++counter;
release_assert (!queue.empty ());
auto entry = queue.front ();
queue.pop_front ();
return { source, entry };
}
auto nano::transport::tcp_channel_queue::next_batch (size_t max_count) -> batch_t
{
// TODO: Naive implementation, could be optimized
std::deque<value_t> result;
while (!empty () && result.size () < max_count)
{
result.emplace_back (next ());
}
return result;
}
size_t nano::transport::tcp_channel_queue::priority (traffic_type type) const
{
switch (type)
{
case traffic_type::generic:
return 1;
case traffic_type::bootstrap:
return 1;
}
debug_assert (false);
return 1;
}
void nano::transport::tcp_channel_queue::seek_next ()
{
counter = 0;
do
{
if (current != queues.end ())
{
++current;
}
if (current == queues.end ())
{
current = queues.begin ();
}
release_assert (current != queues.end ());
} while (current->second.empty ());
}

View file

@ -1,72 +1,82 @@
#pragma once #pragma once
#include <nano/lib/async.hpp>
#include <nano/lib/enum_util.hpp>
#include <nano/node/transport/channel.hpp> #include <nano/node/transport/channel.hpp>
#include <nano/node/transport/fwd.hpp>
#include <nano/node/transport/transport.hpp> #include <nano/node/transport/transport.hpp>
namespace nano::transport namespace nano::transport
{ {
class tcp_server; class tcp_channel_queue final
class tcp_channels; {
class tcp_channel; public:
explicit tcp_channel_queue ();
class tcp_channel : public nano::transport::channel, public std::enable_shared_from_this<tcp_channel> using callback_t = std::function<void (boost::system::error_code const &, std::size_t)>;
using entry_t = std::pair<nano::shared_const_buffer, callback_t>;
using value_t = std::pair<traffic_type, entry_t>;
using batch_t = std::deque<value_t>;
bool empty () const;
size_t size () const;
size_t size (traffic_type) const;
void push (traffic_type, entry_t);
value_t next ();
batch_t next_batch (size_t max_count);
bool max (traffic_type) const;
bool full (traffic_type) const;
constexpr static size_t max_size = 128;
private:
void seek_next ();
size_t priority (traffic_type) const;
using queue_t = std::pair<traffic_type, std::deque<entry_t>>;
nano::enum_array<traffic_type, queue_t> queues{};
nano::enum_array<traffic_type, queue_t>::iterator current{ queues.end () };
size_t counter{ 0 };
};
class tcp_channel final : public nano::transport::channel, public std::enable_shared_from_this<tcp_channel>
{ {
friend class nano::transport::tcp_channels; friend class nano::transport::tcp_channels;
public: public:
tcp_channel (nano::node &, std::weak_ptr<nano::transport::tcp_socket>); tcp_channel (nano::node &, std::shared_ptr<nano::transport::tcp_socket>);
~tcp_channel () override; ~tcp_channel () override;
void update_endpoints (); void close () override;
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions// bool send_buffer (nano::shared_const_buffer const &,
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override; nano::transport::channel::callback_t const & callback = nullptr,
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
nano::transport::traffic_type = nano::transport::traffic_type::generic)
override;
std::string to_string () const override; bool max (nano::transport::traffic_type traffic_type) override;
bool alive () const override;
nano::endpoint get_remote_endpoint () const override nano::endpoint get_remote_endpoint () const override;
{ nano::endpoint get_local_endpoint () const override;
nano::lock_guard<nano::mutex> lock{ mutex };
return remote_endpoint;
}
nano::endpoint get_local_endpoint () const override
{
nano::lock_guard<nano::mutex> lock{ mutex };
return local_endpoint;
}
nano::transport::transport_type get_type () const override nano::transport::transport_type get_type () const override
{ {
return nano::transport::transport_type::tcp; return nano::transport::transport_type::tcp;
} }
bool max (nano::transport::traffic_type traffic_type) override std::string to_string () const override;
{
bool result = true;
if (auto socket_l = socket.lock ())
{
result = socket_l->max (traffic_type);
}
return result;
}
bool alive () const override private:
{ void start ();
if (auto socket_l = socket.lock ()) void stop ();
{
return socket_l->alive ();
}
return false;
}
void close () override asio::awaitable<void> run_sending (nano::async::condition &);
{ asio::awaitable<void> send_one (traffic_type, tcp_channel_queue::entry_t const &);
if (auto socket_l = socket.lock ()) asio::awaitable<void> wait_bandwidth (traffic_type, size_t size);
{ asio::awaitable<void> wait_socket (traffic_type);
socket_l->close ();
}
}
public: public:
std::weak_ptr<nano::transport::tcp_socket> socket; std::weak_ptr<nano::transport::tcp_socket> socket;
@ -75,7 +85,19 @@ private:
nano::endpoint remote_endpoint; nano::endpoint remote_endpoint;
nano::endpoint local_endpoint; nano::endpoint local_endpoint;
nano::async::strand strand;
nano::async::task sending_task;
mutable nano::mutex mutex;
tcp_channel_queue queue;
std::atomic<size_t> allocated_bandwidth{ 0 };
// Debugging
std::atomic<bool> closed{ false };
std::string stacktrace;
public: // Logging public: // Logging
void operator() (nano::object_stream &) const override; void operator() (nano::object_stream &) const override;
}; };
} }

View file

@ -112,7 +112,6 @@ std::shared_ptr<nano::transport::tcp_channel> nano::transport::tcp_channels::cre
node_id.to_node_id ()); node_id.to_node_id ());
auto channel = std::make_shared<nano::transport::tcp_channel> (node, socket); auto channel = std::make_shared<nano::transport::tcp_channel> (node, socket);
channel->update_endpoints ();
channel->set_node_id (node_id); channel->set_node_id (node_id);
attempts.get<endpoint_tag> ().erase (endpoint); attempts.get<endpoint_tag> ().erase (endpoint);

View file

@ -4,6 +4,7 @@
#include <nano/lib/random.hpp> #include <nano/lib/random.hpp>
#include <nano/node/endpoint.hpp> #include <nano/node/endpoint.hpp>
#include <nano/node/transport/channel.hpp> #include <nano/node/transport/channel.hpp>
#include <nano/node/transport/fwd.hpp>
#include <nano/node/transport/tcp_channel.hpp> #include <nano/node/transport/tcp_channel.hpp>
#include <nano/node/transport/transport.hpp> #include <nano/node/transport/transport.hpp>

View file

@ -1,6 +1,19 @@
#include <nano/lib/enum_util.hpp> #include <nano/lib/enum_util.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/transport/traffic_type.hpp> #include <nano/node/transport/traffic_type.hpp>
#include <magic_enum.hpp>
std::string_view nano::transport::to_string (nano::transport::traffic_type type)
{
return nano::enum_util::name (type);
}
std::vector<nano::transport::traffic_type> nano::transport::all_traffic_types ()
{
return nano::enum_util::values<nano::transport::traffic_type> ();
}
nano::stat::detail nano::transport::to_stat_detail (nano::transport::traffic_type type) nano::stat::detail nano::transport::to_stat_detail (nano::transport::traffic_type type)
{ {
return nano::enum_util::cast<nano::stat::detail> (type); return nano::enum_util::cast<nano::stat::detail> (type);

View file

@ -2,6 +2,9 @@
#include <nano/lib/stats.hpp> #include <nano/lib/stats.hpp>
#include <string_view>
#include <vector>
namespace nano::transport namespace nano::transport
{ {
/** /**
@ -13,5 +16,7 @@ enum class traffic_type
bootstrap, // Ascending bootstrap (asc_pull_ack, asc_pull_req) traffic bootstrap, // Ascending bootstrap (asc_pull_ack, asc_pull_req) traffic
}; };
std::string_view to_string (traffic_type);
std::vector<traffic_type> all_traffic_types ();
nano::stat::detail to_stat_detail (traffic_type); nano::stat::detail to_stat_detail (traffic_type);
} }