Socket write queue fixes and improvements (#4202)
Fixes a socket data error where partial simultaneous writes can be interleaved corrupting the connection. Adds ability to prioritise non-bootstrap traffic
This commit is contained in:
parent
168367ac78
commit
62bdaba67c
16 changed files with 317 additions and 122 deletions
|
|
@ -65,9 +65,9 @@ TEST (socket, max_connections)
|
||||||
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
|
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
|
||||||
};
|
};
|
||||||
|
|
||||||
ASSERT_TIMELY (5s, get_tcp_accept_failures () == 1);
|
ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 1);
|
||||||
ASSERT_TIMELY (5s, get_tcp_accept_successes () == 2);
|
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 2);
|
||||||
ASSERT_TIMELY (5s, connection_attempts == 3);
|
ASSERT_TIMELY_EQ (5s, connection_attempts, 3);
|
||||||
|
|
||||||
// create space for one socket and fill the connections table again
|
// create space for one socket and fill the connections table again
|
||||||
|
|
||||||
|
|
@ -79,9 +79,9 @@ TEST (socket, max_connections)
|
||||||
auto client5 = std::make_shared<nano::transport::client_socket> (*node);
|
auto client5 = std::make_shared<nano::transport::client_socket> (*node);
|
||||||
client5->async_connect (dst_endpoint, connect_handler);
|
client5->async_connect (dst_endpoint, connect_handler);
|
||||||
|
|
||||||
ASSERT_TIMELY (5s, get_tcp_accept_failures () == 2);
|
ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 2);
|
||||||
ASSERT_TIMELY (5s, get_tcp_accept_successes () == 3);
|
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 3);
|
||||||
ASSERT_TIMELY (5s, connection_attempts == 5);
|
ASSERT_TIMELY_EQ (5s, connection_attempts, 5);
|
||||||
|
|
||||||
// close all existing sockets and fill the connections table again
|
// close all existing sockets and fill the connections table again
|
||||||
// start counting form 1 because 0 is the already closed socket
|
// start counting form 1 because 0 is the already closed socket
|
||||||
|
|
@ -99,10 +99,10 @@ TEST (socket, max_connections)
|
||||||
auto client8 = std::make_shared<nano::transport::client_socket> (*node);
|
auto client8 = std::make_shared<nano::transport::client_socket> (*node);
|
||||||
client8->async_connect (dst_endpoint, connect_handler);
|
client8->async_connect (dst_endpoint, connect_handler);
|
||||||
|
|
||||||
ASSERT_TIMELY (5s, get_tcp_accept_failures () == 3);
|
ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 3);
|
||||||
ASSERT_TIMELY (5s, get_tcp_accept_successes () == 5);
|
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 5);
|
||||||
ASSERT_TIMELY (5s, connection_attempts == 8); // connections initiated by the client
|
ASSERT_TIMELY_EQ (5s, connection_attempts, 8); // connections initiated by the client
|
||||||
ASSERT_TIMELY (5s, server_sockets.size () == 5); // connections accepted by the server
|
ASSERT_TIMELY_EQ (5s, server_sockets.size (), 5); // connections accepted by the server
|
||||||
|
|
||||||
node->stop ();
|
node->stop ();
|
||||||
}
|
}
|
||||||
|
|
@ -459,11 +459,11 @@ TEST (socket, drop_policy)
|
||||||
|
|
||||||
// We're going to write twice the queue size + 1, and the server isn't reading
|
// We're going to write twice the queue size + 1, and the server isn't reading
|
||||||
// The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop)
|
// The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop)
|
||||||
func (nano::transport::socket::queue_size_max * 2 + 1, nano::transport::buffer_drop_policy::no_socket_drop);
|
func (nano::transport::socket::default_max_queue_size * 2 + 1, nano::transport::buffer_drop_policy::no_socket_drop);
|
||||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out));
|
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out));
|
||||||
ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out));
|
ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out));
|
||||||
|
|
||||||
func (nano::transport::socket::queue_size_max + 1, nano::transport::buffer_drop_policy::limiter);
|
func (nano::transport::socket::default_max_queue_size + 1, nano::transport::buffer_drop_policy::limiter);
|
||||||
// The stats are accumulated from before
|
// The stats are accumulated from before
|
||||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out));
|
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out));
|
||||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out));
|
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out));
|
||||||
|
|
@ -716,7 +716,7 @@ TEST (socket_timeout, write)
|
||||||
// create a client socket and send lots of data to fill the socket queue on the local and remote side
|
// create a client socket and send lots of data to fill the socket queue on the local and remote side
|
||||||
// eventually, the all tcp queues should fill up and async_write will not be able to progress
|
// eventually, the all tcp queues should fill up and async_write will not be able to progress
|
||||||
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
|
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
|
||||||
auto socket = std::make_shared<nano::transport::client_socket> (*node);
|
auto socket = std::make_shared<nano::transport::client_socket> (*node, 1024 * 64); // socket with a max queue size much larger than OS buffers
|
||||||
std::atomic<bool> done = false;
|
std::atomic<bool> done = false;
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
|
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
|
||||||
|
|
@ -826,7 +826,7 @@ TEST (socket_timeout, write_overlapped)
|
||||||
// create a client socket and send lots of data to fill the socket queue on the local and remote side
|
// create a client socket and send lots of data to fill the socket queue on the local and remote side
|
||||||
// eventually, the all tcp queues should fill up and async_write will not be able to progress
|
// eventually, the all tcp queues should fill up and async_write will not be able to progress
|
||||||
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
|
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
|
||||||
auto socket = std::make_shared<nano::transport::client_socket> (*node);
|
auto socket = std::make_shared<nano::transport::client_socket> (*node, 1024 * 64); // socket with a max queue size much larger than OS buffers
|
||||||
std::atomic<bool> done = false;
|
std::atomic<bool> done = false;
|
||||||
boost::system::error_code ec;
|
boost::system::error_code ec;
|
||||||
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
|
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
|
||||||
|
|
|
||||||
|
|
@ -56,4 +56,19 @@ void nano::outbound_bandwidth_limiter::reset (std::size_t limit, double burst_ra
|
||||||
{
|
{
|
||||||
auto & limiter = select_limiter (type);
|
auto & limiter = select_limiter (type);
|
||||||
limiter.reset (limit, burst_ratio);
|
limiter.reset (limit, burst_ratio);
|
||||||
|
}
|
||||||
|
|
||||||
|
nano::bandwidth_limit_type nano::to_bandwidth_limit_type (const nano::transport::traffic_type & traffic_type)
|
||||||
|
{
|
||||||
|
switch (traffic_type)
|
||||||
|
{
|
||||||
|
case nano::transport::traffic_type::generic:
|
||||||
|
return nano::bandwidth_limit_type::standard;
|
||||||
|
break;
|
||||||
|
case nano::transport::traffic_type::bootstrap:
|
||||||
|
return nano::bandwidth_limit_type::bootstrap;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
debug_assert (false);
|
||||||
|
return {};
|
||||||
}
|
}
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <nano/lib/rate_limiting.hpp>
|
#include <nano/lib/rate_limiting.hpp>
|
||||||
|
#include <nano/node/transport/traffic_type.hpp>
|
||||||
|
|
||||||
namespace nano
|
namespace nano
|
||||||
{
|
{
|
||||||
|
|
@ -15,6 +16,8 @@ enum class bandwidth_limit_type
|
||||||
bootstrap
|
bootstrap
|
||||||
};
|
};
|
||||||
|
|
||||||
|
nano::bandwidth_limit_type to_bandwidth_limit_type (nano::transport::traffic_type const &);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class that tracks and manages bandwidth limits for IO operations
|
* Class that tracks and manages bandwidth limits for IO operations
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -456,7 +456,7 @@ void nano::bootstrap_ascending::send (std::shared_ptr<nano::transport::channel>
|
||||||
// TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests
|
// TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests
|
||||||
channel->send (
|
channel->send (
|
||||||
request, nullptr,
|
request, nullptr,
|
||||||
nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap);
|
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t nano::bootstrap_ascending::priority_size () const
|
size_t nano::bootstrap_ascending::priority_size () const
|
||||||
|
|
@ -562,7 +562,7 @@ std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::available_c
|
||||||
auto channels = network.random_set (32, node.network_params.network.bootstrap_protocol_version_min, /* include temporary channels */ true);
|
auto channels = network.random_set (32, node.network_params.network.bootstrap_protocol_version_min, /* include temporary channels */ true);
|
||||||
for (auto & channel : channels)
|
for (auto & channel : channels)
|
||||||
{
|
{
|
||||||
if (!channel->max ())
|
if (!channel->max (nano::transport::traffic_type::bootstrap))
|
||||||
{
|
{
|
||||||
return channel;
|
return channel;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -81,7 +81,7 @@ bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::s
|
||||||
|
|
||||||
// If channel is full our response will be dropped anyway, so filter that early
|
// If channel is full our response will be dropped anyway, so filter that early
|
||||||
// TODO: Add per channel limits (this ideally should be done on the channel message processing side)
|
// TODO: Add per channel limits (this ideally should be done on the channel message processing side)
|
||||||
if (channel->max ())
|
if (channel->max (nano::transport::traffic_type::bootstrap))
|
||||||
{
|
{
|
||||||
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in);
|
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in);
|
||||||
return false;
|
return false;
|
||||||
|
|
@ -125,7 +125,7 @@ void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared
|
||||||
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::write_error, nano::stat::dir::out);
|
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::write_error, nano::stat::dir::out);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap);
|
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -138,7 +138,7 @@ void nano::bootstrap_server::process_batch (std::deque<request_t> & batch)
|
||||||
|
|
||||||
for (auto & [request, channel] : batch)
|
for (auto & [request, channel] : batch)
|
||||||
{
|
{
|
||||||
if (!channel->max ())
|
if (!channel->max (nano::transport::traffic_type::bootstrap))
|
||||||
{
|
{
|
||||||
auto response = process (transaction, request);
|
auto response = process (transaction, request);
|
||||||
respond (response, channel);
|
respond (response, channel);
|
||||||
|
|
|
||||||
|
|
@ -14,15 +14,15 @@ nano::transport::channel::channel (nano::node & node_a) :
|
||||||
set_network_version (node_a.network_params.network.protocol_version);
|
set_network_version (node_a.network_params.network.protocol_version);
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::transport::channel::send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::bandwidth_limit_type limiter_type)
|
void nano::transport::channel::send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
|
||||||
{
|
{
|
||||||
auto buffer (message_a.to_shared_const_buffer ());
|
auto buffer (message_a.to_shared_const_buffer ());
|
||||||
auto detail = nano::to_stat_detail (message_a.header.type);
|
auto detail = nano::to_stat_detail (message_a.header.type);
|
||||||
auto is_droppable_by_limiter = (drop_policy_a == nano::transport::buffer_drop_policy::limiter);
|
auto is_droppable_by_limiter = (drop_policy_a == nano::transport::buffer_drop_policy::limiter);
|
||||||
auto should_pass (node.outbound_limiter.should_pass (buffer.size (), limiter_type));
|
auto should_pass (node.outbound_limiter.should_pass (buffer.size (), to_bandwidth_limit_type (traffic_type)));
|
||||||
if (!is_droppable_by_limiter || should_pass)
|
if (!is_droppable_by_limiter || should_pass)
|
||||||
{
|
{
|
||||||
send_buffer (buffer, callback_a, drop_policy_a);
|
send_buffer (buffer, callback_a, drop_policy_a, traffic_type);
|
||||||
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
|
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
|
|
||||||
|
|
@ -27,15 +27,25 @@ public:
|
||||||
|
|
||||||
virtual std::size_t hash_code () const = 0;
|
virtual std::size_t hash_code () const = 0;
|
||||||
virtual bool operator== (nano::transport::channel const &) const = 0;
|
virtual bool operator== (nano::transport::channel const &) const = 0;
|
||||||
void send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a = nullptr, nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type = nano::bandwidth_limit_type::standard);
|
|
||||||
|
void send (nano::message & message_a,
|
||||||
|
std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a = nullptr,
|
||||||
|
nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter,
|
||||||
|
nano::transport::traffic_type = nano::transport::traffic_type::generic);
|
||||||
|
|
||||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
|
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
|
||||||
//
|
virtual void send_buffer (nano::shared_const_buffer const &,
|
||||||
virtual void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter) = 0;
|
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
|
||||||
|
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
|
||||||
|
nano::transport::traffic_type = nano::transport::traffic_type::generic)
|
||||||
|
= 0;
|
||||||
|
|
||||||
virtual std::string to_string () const = 0;
|
virtual std::string to_string () const = 0;
|
||||||
virtual nano::endpoint get_endpoint () const = 0;
|
virtual nano::endpoint get_endpoint () const = 0;
|
||||||
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
|
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
|
||||||
virtual nano::transport::transport_type get_type () const = 0;
|
virtual nano::transport::transport_type get_type () const = 0;
|
||||||
virtual bool max ()
|
|
||||||
|
virtual bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,10 +13,10 @@ nano::transport::fake::channel::channel (nano::node & node) :
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The send function behaves like a null device, it throws the data away and returns success.
|
* The send function behaves like a null device, it throws the data away and returns success.
|
||||||
*/
|
*/
|
||||||
void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a)
|
void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
|
||||||
{
|
{
|
||||||
//auto bytes = buffer_a.to_bytes ();
|
// auto bytes = buffer_a.to_bytes ();
|
||||||
auto size = buffer_a.size ();
|
auto size = buffer_a.size ();
|
||||||
if (callback_a)
|
if (callback_a)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -20,13 +20,11 @@ namespace transport
|
||||||
std::string to_string () const override;
|
std::string to_string () const override;
|
||||||
std::size_t hash_code () const override;
|
std::size_t hash_code () const override;
|
||||||
|
|
||||||
// clang-format off
|
|
||||||
void send_buffer (
|
void send_buffer (
|
||||||
nano::shared_const_buffer const &,
|
nano::shared_const_buffer const &,
|
||||||
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
|
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
|
||||||
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter
|
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
|
||||||
) override;
|
nano::transport::traffic_type = nano::transport::traffic_type::generic) override;
|
||||||
// clang-format on
|
|
||||||
|
|
||||||
bool operator== (nano::transport::channel const &) const override;
|
bool operator== (nano::transport::channel const &) const override;
|
||||||
bool operator== (nano::transport::fake::channel const & other_a) const;
|
bool operator== (nano::transport::fake::channel const & other_a) const;
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ public:
|
||||||
* Send the buffer to the peer and call the callback function when done. The call never fails.
|
* Send the buffer to the peer and call the callback function when done. The call never fails.
|
||||||
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
|
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
|
||||||
*/
|
*/
|
||||||
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a)
|
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
|
||||||
{
|
{
|
||||||
std::size_t offset{ 0 };
|
std::size_t offset{ 0 };
|
||||||
auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
|
auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
|
||||||
|
|
|
||||||
|
|
@ -18,9 +18,10 @@ namespace transport
|
||||||
explicit channel (nano::node & node, nano::node & destination);
|
explicit channel (nano::node & node, nano::node & destination);
|
||||||
std::size_t hash_code () const override;
|
std::size_t hash_code () const override;
|
||||||
bool operator== (nano::transport::channel const &) const override;
|
bool operator== (nano::transport::channel const &) const override;
|
||||||
|
|
||||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
|
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
|
||||||
//
|
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override;
|
||||||
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter) override;
|
|
||||||
std::string to_string () const override;
|
std::string to_string () const override;
|
||||||
bool operator== (nano::transport::inproc::channel const & other_a) const
|
bool operator== (nano::transport::inproc::channel const & other_a) const
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,12 @@ bool is_temporary_error (boost::system::error_code const & ec_a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_type_a) :
|
/*
|
||||||
|
* socket
|
||||||
|
*/
|
||||||
|
|
||||||
|
nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_type_a, std::size_t max_queue_size_a) :
|
||||||
|
send_queue{ max_queue_size_a },
|
||||||
strand{ node_a.io_ctx.get_executor () },
|
strand{ node_a.io_ctx.get_executor () },
|
||||||
tcp_socket{ node_a.io_ctx },
|
tcp_socket{ node_a.io_ctx },
|
||||||
node{ node_a },
|
node{ node_a },
|
||||||
|
|
@ -42,7 +47,8 @@ nano::transport::socket::socket (nano::node & node_a, endpoint_type_t endpoint_t
|
||||||
last_completion_time_or_init{ nano::seconds_since_epoch () },
|
last_completion_time_or_init{ nano::seconds_since_epoch () },
|
||||||
last_receive_time_or_init{ nano::seconds_since_epoch () },
|
last_receive_time_or_init{ nano::seconds_since_epoch () },
|
||||||
default_timeout{ node_a.config.tcp_io_timeout },
|
default_timeout{ node_a.config.tcp_io_timeout },
|
||||||
silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time }
|
silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time },
|
||||||
|
max_queue_size{ max_queue_size_a }
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -51,12 +57,17 @@ nano::transport::socket::~socket ()
|
||||||
close_internal ();
|
close_internal ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void nano::transport::socket::start ()
|
||||||
|
{
|
||||||
|
ongoing_checkup ();
|
||||||
|
}
|
||||||
|
|
||||||
void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function<void (boost::system::error_code const &)> callback_a)
|
void nano::transport::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function<void (boost::system::error_code const &)> callback_a)
|
||||||
{
|
{
|
||||||
debug_assert (callback_a);
|
debug_assert (callback_a);
|
||||||
debug_assert (endpoint_type () == endpoint_type_t::client);
|
debug_assert (endpoint_type () == endpoint_type_t::client);
|
||||||
|
|
||||||
checkup ();
|
start ();
|
||||||
auto this_l (shared_from_this ());
|
auto this_l (shared_from_this ());
|
||||||
set_default_timeout ();
|
set_default_timeout ();
|
||||||
|
|
||||||
|
|
@ -116,7 +127,7 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::transport::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
|
void nano::transport::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a, nano::transport::traffic_type traffic_type)
|
||||||
{
|
{
|
||||||
if (closed)
|
if (closed)
|
||||||
{
|
{
|
||||||
|
|
@ -126,49 +137,83 @@ void nano::transport::socket::async_write (nano::shared_const_buffer const & buf
|
||||||
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
++queue_size;
|
bool queued = send_queue.insert (buffer_a, callback_a, traffic_type);
|
||||||
|
if (!queued)
|
||||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback = std::move (callback_a), this_l = shared_from_this ()] () mutable {
|
{
|
||||||
if (this_l->closed)
|
if (callback_a)
|
||||||
{
|
{
|
||||||
if (callback)
|
node.background ([callback = std::move (callback_a)] () {
|
||||||
{
|
|
||||||
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||||
}
|
});
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_s = shared_from_this (), buffer_a, callback_a, traffic_type] () {
|
||||||
|
if (!this_s->write_in_progress)
|
||||||
|
{
|
||||||
|
this_s->write_queued_messages ();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Must be called from strand
|
||||||
|
void nano::transport::socket::write_queued_messages ()
|
||||||
|
{
|
||||||
|
if (closed)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto next = send_queue.pop ();
|
||||||
|
if (!next)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
set_default_timeout ();
|
||||||
|
|
||||||
|
write_in_progress = true;
|
||||||
|
nano::async_write (tcp_socket, next->buffer,
|
||||||
|
boost::asio::bind_executor (strand, [this_s = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) {
|
||||||
|
this_s->write_in_progress = false;
|
||||||
|
|
||||||
|
if (ec)
|
||||||
|
{
|
||||||
|
this_s->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
|
||||||
|
this_s->close ();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
this_s->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
|
||||||
|
this_s->set_last_completion ();
|
||||||
}
|
}
|
||||||
|
|
||||||
this_l->set_default_timeout ();
|
if (next->callback)
|
||||||
|
{
|
||||||
|
next->callback (ec, size);
|
||||||
|
}
|
||||||
|
|
||||||
nano::async_write (this_l->tcp_socket, buffer_a,
|
if (!ec)
|
||||||
boost::asio::bind_executor (this_l->strand,
|
{
|
||||||
[buffer_a, cbk = std::move (callback), this_l] (boost::system::error_code ec, std::size_t size_a) {
|
this_s->write_queued_messages ();
|
||||||
--this_l->queue_size;
|
}
|
||||||
|
|
||||||
if (ec)
|
|
||||||
{
|
|
||||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
|
|
||||||
this_l->close ();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
|
|
||||||
this_l->set_last_completion ();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cbk)
|
|
||||||
{
|
|
||||||
cbk (ec, size_a);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool nano::transport::socket::max (nano::transport::traffic_type traffic_type) const
|
||||||
|
{
|
||||||
|
return send_queue.size (traffic_type) >= max_queue_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nano::transport::socket::full (nano::transport::traffic_type traffic_type) const
|
||||||
|
{
|
||||||
|
return send_queue.size (traffic_type) >= 2 * max_queue_size;
|
||||||
|
}
|
||||||
|
|
||||||
/** Call set_timeout with default_timeout as parameter */
|
/** Call set_timeout with default_timeout as parameter */
|
||||||
void nano::transport::socket::set_default_timeout ()
|
void nano::transport::socket::set_default_timeout ()
|
||||||
{
|
{
|
||||||
|
|
@ -196,7 +241,7 @@ void nano::transport::socket::set_last_receive_time ()
|
||||||
last_receive_time_or_init = nano::seconds_since_epoch ();
|
last_receive_time_or_init = nano::seconds_since_epoch ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::transport::socket::checkup ()
|
void nano::transport::socket::ongoing_checkup ()
|
||||||
{
|
{
|
||||||
std::weak_ptr<nano::transport::socket> this_w (shared_from_this ());
|
std::weak_ptr<nano::transport::socket> this_w (shared_from_this ());
|
||||||
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [this_w] () {
|
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [this_w] () {
|
||||||
|
|
@ -238,7 +283,7 @@ void nano::transport::socket::checkup ()
|
||||||
}
|
}
|
||||||
else if (!this_l->closed)
|
else if (!this_l->closed)
|
||||||
{
|
{
|
||||||
this_l->checkup ();
|
this_l->ongoing_checkup ();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
@ -289,19 +334,24 @@ void nano::transport::socket::close ()
|
||||||
// This must be called from a strand or the destructor
|
// This must be called from a strand or the destructor
|
||||||
void nano::transport::socket::close_internal ()
|
void nano::transport::socket::close_internal ()
|
||||||
{
|
{
|
||||||
if (!closed.exchange (true))
|
if (closed.exchange (true))
|
||||||
{
|
{
|
||||||
default_timeout = std::chrono::seconds (0);
|
return;
|
||||||
boost::system::error_code ec;
|
}
|
||||||
|
|
||||||
// Ignore error code for shutdown as it is best-effort
|
send_queue.clear ();
|
||||||
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
|
|
||||||
tcp_socket.close (ec);
|
default_timeout = std::chrono::seconds (0);
|
||||||
if (ec)
|
boost::system::error_code ec;
|
||||||
{
|
|
||||||
node.logger.try_log ("Failed to close socket gracefully: ", ec.message ());
|
// Ignore error code for shutdown as it is best-effort
|
||||||
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close);
|
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
|
||||||
}
|
tcp_socket.close (ec);
|
||||||
|
|
||||||
|
if (ec)
|
||||||
|
{
|
||||||
|
node.logger.try_log ("Failed to close socket gracefully: ", ec.message ());
|
||||||
|
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -315,6 +365,82 @@ nano::tcp_endpoint nano::transport::socket::local_endpoint () const
|
||||||
return tcp_socket.local_endpoint ();
|
return tcp_socket.local_endpoint ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* write_queue
|
||||||
|
*/
|
||||||
|
|
||||||
|
nano::transport::socket::write_queue::write_queue (std::size_t max_size_a) :
|
||||||
|
max_size{ max_size_a }
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nano::transport::socket::write_queue::insert (const buffer_t & buffer, callback_t callback, nano::transport::traffic_type traffic_type)
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> guard{ mutex };
|
||||||
|
if (queues[traffic_type].size () < 2 * max_size)
|
||||||
|
{
|
||||||
|
queues[traffic_type].push (entry{ buffer, callback });
|
||||||
|
return true; // Queued
|
||||||
|
}
|
||||||
|
return false; // Not queued
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<nano::transport::socket::write_queue::entry> nano::transport::socket::write_queue::pop ()
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> guard{ mutex };
|
||||||
|
|
||||||
|
auto try_pop = [this] (nano::transport::traffic_type type) -> std::optional<entry> {
|
||||||
|
auto & que = queues[type];
|
||||||
|
if (!que.empty ())
|
||||||
|
{
|
||||||
|
auto item = que.front ();
|
||||||
|
que.pop ();
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
return std::nullopt;
|
||||||
|
};
|
||||||
|
|
||||||
|
// TODO: This is a very basic prioritization, implement something more advanced and configurable
|
||||||
|
if (auto item = try_pop (nano::transport::traffic_type::generic))
|
||||||
|
{
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
if (auto item = try_pop (nano::transport::traffic_type::bootstrap))
|
||||||
|
{
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
void nano::transport::socket::write_queue::clear ()
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> guard{ mutex };
|
||||||
|
queues.clear ();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t nano::transport::socket::write_queue::size (nano::transport::traffic_type traffic_type) const
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> guard{ mutex };
|
||||||
|
if (auto it = queues.find (traffic_type); it != queues.end ())
|
||||||
|
{
|
||||||
|
return it->second.size ();
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool nano::transport::socket::write_queue::empty () const
|
||||||
|
{
|
||||||
|
nano::lock_guard<nano::mutex> guard{ mutex };
|
||||||
|
return std::all_of (queues.begin (), queues.end (), [] (auto const & que) {
|
||||||
|
return que.second.empty ();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* server_socket
|
||||||
|
*/
|
||||||
|
|
||||||
nano::transport::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, std::size_t max_connections_a) :
|
nano::transport::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::endpoint local_a, std::size_t max_connections_a) :
|
||||||
socket{ node_a, endpoint_type_t::server },
|
socket{ node_a, endpoint_type_t::server },
|
||||||
acceptor{ node_a.io_ctx },
|
acceptor{ node_a.io_ctx },
|
||||||
|
|
@ -474,7 +600,7 @@ void nano::transport::server_socket::on_connection (std::function<bool (std::sha
|
||||||
{
|
{
|
||||||
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
|
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
|
||||||
// an IO operation immediately, which will start a timer.
|
// an IO operation immediately, which will start a timer.
|
||||||
new_connection->checkup ();
|
new_connection->start ();
|
||||||
new_connection->set_timeout (this_l->node.network_params.network.idle_timeout);
|
new_connection->set_timeout (this_l->node.network_params.network.idle_timeout);
|
||||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
|
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
|
||||||
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
|
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
|
||||||
|
|
|
||||||
|
|
@ -3,11 +3,16 @@
|
||||||
#include <nano/boost/asio/ip/tcp.hpp>
|
#include <nano/boost/asio/ip/tcp.hpp>
|
||||||
#include <nano/boost/asio/strand.hpp>
|
#include <nano/boost/asio/strand.hpp>
|
||||||
#include <nano/lib/asio.hpp>
|
#include <nano/lib/asio.hpp>
|
||||||
|
#include <nano/lib/locks.hpp>
|
||||||
#include <nano/lib/timer.hpp>
|
#include <nano/lib/timer.hpp>
|
||||||
|
#include <nano/node/transport/traffic_type.hpp>
|
||||||
|
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
#include <optional>
|
||||||
|
#include <queue>
|
||||||
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
namespace boost::asio::ip
|
namespace boost::asio::ip
|
||||||
|
|
@ -43,6 +48,8 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
|
||||||
friend class tcp_channels;
|
friend class tcp_channels;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
static std::size_t constexpr default_max_queue_size = 128;
|
||||||
|
|
||||||
enum class type_t
|
enum class type_t
|
||||||
{
|
{
|
||||||
undefined,
|
undefined,
|
||||||
|
|
@ -62,12 +69,14 @@ public:
|
||||||
* @param node Owning node
|
* @param node Owning node
|
||||||
* @param endpoint_type_a The endpoint's type: either server or client
|
* @param endpoint_type_a The endpoint's type: either server or client
|
||||||
*/
|
*/
|
||||||
explicit socket (nano::node & node, endpoint_type_t endpoint_type_a);
|
explicit socket (nano::node & node, endpoint_type_t endpoint_type_a, std::size_t max_queue_size = default_max_queue_size);
|
||||||
virtual ~socket ();
|
virtual ~socket ();
|
||||||
|
|
||||||
|
void start ();
|
||||||
|
|
||||||
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void (boost::system::error_code const &)>);
|
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void (boost::system::error_code const &)>);
|
||||||
void async_read (std::shared_ptr<std::vector<uint8_t>> const &, std::size_t, std::function<void (boost::system::error_code const &, std::size_t)>);
|
void async_read (std::shared_ptr<std::vector<uint8_t>> const &, std::size_t, std::function<void (boost::system::error_code const &, std::size_t)>);
|
||||||
void async_write (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> = {});
|
void async_write (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> callback = {}, nano::transport::traffic_type = nano::transport::traffic_type::generic);
|
||||||
|
|
||||||
virtual void close ();
|
virtual void close ();
|
||||||
boost::asio::ip::tcp::endpoint remote_endpoint () const;
|
boost::asio::ip::tcp::endpoint remote_endpoint () const;
|
||||||
|
|
@ -79,14 +88,10 @@ public:
|
||||||
std::chrono::seconds get_default_timeout_value () const;
|
std::chrono::seconds get_default_timeout_value () const;
|
||||||
void set_timeout (std::chrono::seconds);
|
void set_timeout (std::chrono::seconds);
|
||||||
void set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a);
|
void set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a);
|
||||||
bool max () const
|
|
||||||
{
|
bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic) const;
|
||||||
return queue_size >= queue_size_max;
|
bool full (nano::transport::traffic_type = nano::transport::traffic_type::generic) const;
|
||||||
}
|
|
||||||
bool full () const
|
|
||||||
{
|
|
||||||
return queue_size >= queue_size_max * 2;
|
|
||||||
}
|
|
||||||
type_t type () const
|
type_t type () const
|
||||||
{
|
{
|
||||||
return type_m;
|
return type_m;
|
||||||
|
|
@ -116,15 +121,38 @@ public:
|
||||||
return !closed && tcp_socket.is_open ();
|
return !closed && tcp_socket.is_open ();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
private:
|
||||||
/** Holds the buffer and callback for queued writes */
|
class write_queue
|
||||||
class queue_item
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
nano::shared_const_buffer buffer;
|
using buffer_t = nano::shared_const_buffer;
|
||||||
std::function<void (boost::system::error_code const &, std::size_t)> callback;
|
using callback_t = std::function<void (boost::system::error_code const &, std::size_t)>;
|
||||||
|
|
||||||
|
struct entry
|
||||||
|
{
|
||||||
|
buffer_t buffer;
|
||||||
|
callback_t callback;
|
||||||
|
};
|
||||||
|
|
||||||
|
public:
|
||||||
|
explicit write_queue (std::size_t max_size);
|
||||||
|
|
||||||
|
bool insert (buffer_t const &, callback_t, nano::transport::traffic_type);
|
||||||
|
std::optional<entry> pop ();
|
||||||
|
void clear ();
|
||||||
|
std::size_t size (nano::transport::traffic_type) const;
|
||||||
|
bool empty () const;
|
||||||
|
|
||||||
|
std::size_t const max_size;
|
||||||
|
|
||||||
|
private:
|
||||||
|
mutable nano::mutex mutex;
|
||||||
|
std::unordered_map<nano::transport::traffic_type, std::queue<entry>> queues;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
write_queue send_queue;
|
||||||
|
|
||||||
|
protected:
|
||||||
boost::asio::strand<boost::asio::io_context::executor_type> strand;
|
boost::asio::strand<boost::asio::io_context::executor_type> strand;
|
||||||
boost::asio::ip::tcp::socket tcp_socket;
|
boost::asio::ip::tcp::socket tcp_socket;
|
||||||
nano::node & node;
|
nano::node & node;
|
||||||
|
|
@ -158,22 +186,19 @@ protected:
|
||||||
/** used in real time server sockets, number of seconds of no receive traffic that will cause the socket to timeout */
|
/** used in real time server sockets, number of seconds of no receive traffic that will cause the socket to timeout */
|
||||||
std::chrono::seconds silent_connection_tolerance_time;
|
std::chrono::seconds silent_connection_tolerance_time;
|
||||||
|
|
||||||
/** Tracks number of blocks queued for delivery to the local socket send buffers.
|
|
||||||
* Under normal circumstances, this should be zero.
|
|
||||||
* Note that this is not the number of buffers queued to the peer, it is the number of buffers
|
|
||||||
* queued up to enter the local TCP send buffer
|
|
||||||
* socket buffer queue -> TCP send queue -> (network) -> TCP receive queue of peer
|
|
||||||
*/
|
|
||||||
std::atomic<std::size_t> queue_size{ 0 };
|
|
||||||
|
|
||||||
/** Set by close() - completion handlers must check this. This is more reliable than checking
|
/** Set by close() - completion handlers must check this. This is more reliable than checking
|
||||||
error codes as the OS may have already completed the async operation. */
|
error codes as the OS may have already completed the async operation. */
|
||||||
std::atomic<bool> closed{ false };
|
std::atomic<bool> closed{ false };
|
||||||
|
|
||||||
|
/** Updated only from strand, but stored as atomic so it can be read from outside */
|
||||||
|
std::atomic<bool> write_in_progress{ false };
|
||||||
|
|
||||||
void close_internal ();
|
void close_internal ();
|
||||||
|
void write_queued_messages ();
|
||||||
void set_default_timeout ();
|
void set_default_timeout ();
|
||||||
void set_last_completion ();
|
void set_last_completion ();
|
||||||
void set_last_receive_time ();
|
void set_last_receive_time ();
|
||||||
void checkup ();
|
void ongoing_checkup ();
|
||||||
void read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a);
|
void read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
@ -181,7 +206,7 @@ private:
|
||||||
endpoint_type_t endpoint_type_m;
|
endpoint_type_t endpoint_type_m;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
static std::size_t constexpr queue_size_max = 128;
|
std::size_t const max_queue_size;
|
||||||
};
|
};
|
||||||
|
|
||||||
std::string socket_type_to_string (socket::type_t type);
|
std::string socket_type_to_string (socket::type_t type);
|
||||||
|
|
@ -238,8 +263,8 @@ public:
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param node_a Owning node
|
* @param node_a Owning node
|
||||||
*/
|
*/
|
||||||
explicit client_socket (nano::node & node_a) :
|
explicit client_socket (nano::node & node_a, std::size_t max_queue_size = default_max_queue_size) :
|
||||||
socket{ node_a, endpoint_type_t::client }
|
socket{ node_a, endpoint_type_t::client, max_queue_size }
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -46,11 +46,11 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const &
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy policy_a)
|
void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy policy_a, nano::transport::traffic_type traffic_type)
|
||||||
{
|
{
|
||||||
if (auto socket_l = socket.lock ())
|
if (auto socket_l = socket.lock ())
|
||||||
{
|
{
|
||||||
if (!socket_l->max () || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full ()))
|
if (!socket_l->max (traffic_type) || (policy_a == nano::transport::buffer_drop_policy::no_socket_drop && !socket_l->full (traffic_type)))
|
||||||
{
|
{
|
||||||
socket_l->async_write (
|
socket_l->async_write (
|
||||||
buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr<nano::node> (node.shared ()), callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
|
buffer_a, [endpoint_a = socket_l->remote_endpoint (), node = std::weak_ptr<nano::node> (node.shared ()), callback_a] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||||
|
|
@ -69,7 +69,8 @@ void nano::transport::channel_tcp::send_buffer (nano::shared_const_buffer const
|
||||||
callback_a (ec, size_a);
|
callback_a (ec, size_a);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
},
|
||||||
|
traffic_type);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -37,11 +37,13 @@ namespace transport
|
||||||
public:
|
public:
|
||||||
channel_tcp (nano::node &, std::weak_ptr<nano::transport::socket>);
|
channel_tcp (nano::node &, std::weak_ptr<nano::transport::socket>);
|
||||||
~channel_tcp () override;
|
~channel_tcp () override;
|
||||||
|
|
||||||
std::size_t hash_code () const override;
|
std::size_t hash_code () const override;
|
||||||
bool operator== (nano::transport::channel const &) const override;
|
bool operator== (nano::transport::channel const &) const override;
|
||||||
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
|
|
||||||
//
|
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions//
|
||||||
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter) override;
|
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override;
|
||||||
|
|
||||||
std::string to_string () const override;
|
std::string to_string () const override;
|
||||||
bool operator== (nano::transport::channel_tcp const & other_a) const
|
bool operator== (nano::transport::channel_tcp const & other_a) const
|
||||||
{
|
{
|
||||||
|
|
@ -70,12 +72,12 @@ namespace transport
|
||||||
return nano::transport::transport_type::tcp;
|
return nano::transport::transport_type::tcp;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual bool max () override
|
virtual bool max (nano::transport::traffic_type traffic_type) override
|
||||||
{
|
{
|
||||||
bool result = true;
|
bool result = true;
|
||||||
if (auto socket_l = socket.lock ())
|
if (auto socket_l = socket.lock ())
|
||||||
{
|
{
|
||||||
result = socket_l->max ();
|
result = socket_l->max (traffic_type);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
14
nano/node/transport/traffic_type.hpp
Normal file
14
nano/node/transport/traffic_type.hpp
Normal file
|
|
@ -0,0 +1,14 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
namespace nano::transport
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Used for message prioritization and bandwidth limits
|
||||||
|
*/
|
||||||
|
enum class traffic_type
|
||||||
|
{
|
||||||
|
generic,
|
||||||
|
/** For bootstrap (asc_pull_ack, asc_pull_req) traffic */
|
||||||
|
bootstrap
|
||||||
|
};
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue