Socket timeout tidyup (#3704)
* socket_timeout unit tests * Rename socket::stop_timer to socket:set_last_completion All stop timer does is to set a variable. It is a simply setter function. It is confusing to call it stop_timer because it implies that it does something more than just set a timestamp. * Rename start_timer() to set_next_deadline() * Remove network constant socket_dev_idle_timeout We do not need it. * Rename update_last_receive_time() to set_last_receive_time() To make it obvious it is setter function and to make it consistent with other setter functions. * Minor code improvement - no functional change * Rename the nano::socket concept of deadline to timeout A deadline makes people imagine a fixed point in time, a timestamp. However, socket::next_deadline is a timeout value and not a deadline. It signifies the seconds of inactivity that would cause a socket timeout. Renames: set_next_deadline -> set_default_timeout set_next_deadline(timeout) -> set_timeout(timeout) io_timeout -> default_timeout timeout_set -> set_default_timeout_value next_deadline -> timeout * Add some comments to nano::socket class * Remove needless code * Use guard ifs to make nano::socket::async_write more readable
This commit is contained in:
parent
e960ce8376
commit
275a545062
6 changed files with 364 additions and 74 deletions
|
@ -7,6 +7,8 @@
|
|||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <boost/asio/read.hpp>
|
||||
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <utility>
|
||||
|
@ -358,7 +360,7 @@ TEST (socket, disconnection_of_silent_connections)
|
|||
nano::node_config config;
|
||||
// Increasing the timer timeout, so we don't let the connection to timeout due to the timer checker.
|
||||
config.tcp_io_timeout = std::chrono::seconds::max ();
|
||||
config.network_params.network.socket_dev_idle_timeout = std::chrono::seconds::max ();
|
||||
config.network_params.network.idle_timeout = std::chrono::seconds::max ();
|
||||
// Silent connections are connections open by external peers that don't contribute with any data.
|
||||
config.network_params.network.silent_connection_tolerance_time = std::chrono::seconds{ 5 };
|
||||
|
||||
|
@ -605,3 +607,252 @@ TEST (socket, concurrent_writes)
|
|||
t.join ();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that the socket correctly handles a tcp_io_timeout during tcp connect
|
||||
* Steps:
|
||||
* set timeout to one second
|
||||
* do a tcp connect that will block for at least a few seconds at the tcp level
|
||||
* check that the connect returns error and that the correct counters have been incremented
|
||||
*/
|
||||
TEST (socket_timeout, connect)
|
||||
{
|
||||
// create one node and set timeout to 1 second
|
||||
nano::system system (1);
|
||||
std::shared_ptr<nano::node> node = system.nodes[0];
|
||||
node->config.tcp_io_timeout = std::chrono::seconds (1);
|
||||
|
||||
// try to connect to an IP address that most likely does not exist and will not reply
|
||||
// we want the tcp stack to not receive a negative reply, we want it to see silence and to keep trying
|
||||
// I use the un-routable IP address 10.255.254.253, which is likely to not exist
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::make_address_v6 ("::ffff:10.255.254.253"), nano::get_available_port ());
|
||||
|
||||
// create a client socket and try to connect to the IP address that wil not respond
|
||||
auto socket = std::make_shared<nano::client_socket> (*node);
|
||||
std::atomic<bool> done = false;
|
||||
boost::system::error_code ec;
|
||||
socket->async_connect (endpoint, [&ec, &done] (boost::system::error_code const & ec_a) {
|
||||
if (ec_a)
|
||||
{
|
||||
ec = ec_a;
|
||||
done = true;
|
||||
}
|
||||
});
|
||||
|
||||
// check that the callback was called and we got an error
|
||||
ASSERT_TIMELY (6s, done == true);
|
||||
ASSERT_TRUE (ec);
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error, nano::stat::dir::in));
|
||||
|
||||
// check that the socket was closed due to tcp_io_timeout timeout
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out));
|
||||
}
|
||||
|
||||
TEST (socket_timeout, read)
|
||||
{
|
||||
// create one node and set timeout to 1 second
|
||||
nano::system system (1);
|
||||
std::shared_ptr<nano::node> node = system.nodes[0];
|
||||
node->config.tcp_io_timeout = std::chrono::seconds (2);
|
||||
|
||||
// create a server socket
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), nano::get_available_port ());
|
||||
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
|
||||
acceptor.open (endpoint.protocol ());
|
||||
acceptor.bind (endpoint);
|
||||
acceptor.listen (boost::asio::socket_base::max_listen_connections);
|
||||
|
||||
// asynchronously accept an incoming connection and create a newsock and do not send any data
|
||||
boost::asio::ip::tcp::socket newsock (system.io_ctx);
|
||||
acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) {
|
||||
debug_assert (!ec_a);
|
||||
});
|
||||
|
||||
// create a client socket to connect and call async_read, which should time out
|
||||
auto socket = std::make_shared<nano::client_socket> (*node);
|
||||
std::atomic<bool> done = false;
|
||||
boost::system::error_code ec;
|
||||
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
|
||||
debug_assert (!ec_a);
|
||||
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
|
||||
socket->async_read (buffer, 1, [&ec, &done] (boost::system::error_code const & ec_a, size_t size_a) {
|
||||
if (ec_a)
|
||||
{
|
||||
ec = ec_a;
|
||||
done = true;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// check that the callback was called and we got an error
|
||||
ASSERT_TIMELY (10s, done == true);
|
||||
ASSERT_TRUE (ec);
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in));
|
||||
|
||||
// check that the socket was closed due to tcp_io_timeout timeout
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out));
|
||||
}
|
||||
|
||||
TEST (socket_timeout, write)
|
||||
{
|
||||
// create one node and set timeout to 1 second
|
||||
nano::system system (1);
|
||||
std::shared_ptr<nano::node> node = system.nodes[0];
|
||||
node->config.tcp_io_timeout = std::chrono::seconds (2);
|
||||
|
||||
// create a server socket
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), nano::get_available_port ());
|
||||
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
|
||||
acceptor.open (endpoint.protocol ());
|
||||
acceptor.bind (endpoint);
|
||||
acceptor.listen (boost::asio::socket_base::max_listen_connections);
|
||||
|
||||
// asynchronously accept an incoming connection and create a newsock and do not receive any data
|
||||
boost::asio::ip::tcp::socket newsock (system.io_ctx);
|
||||
acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) {
|
||||
debug_assert (!ec_a);
|
||||
});
|
||||
|
||||
// create a client socket and send lots of data to fill the socket queue on the local and remote side
|
||||
// eventually, the all tcp queues should fill up and async_write will not be able to progress
|
||||
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
|
||||
auto socket = std::make_shared<nano::client_socket> (*node);
|
||||
std::atomic<bool> done = false;
|
||||
boost::system::error_code ec;
|
||||
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
|
||||
debug_assert (!ec_a);
|
||||
auto buffer = std::make_shared<std::vector<uint8_t>> (128 * 1024);
|
||||
for (auto i = 0; i < 1024; ++i)
|
||||
{
|
||||
socket->async_write (nano::shared_const_buffer{ buffer }, [&ec, &done] (boost::system::error_code const & ec_a, size_t size_a) {
|
||||
if (ec_a)
|
||||
{
|
||||
ec = ec_a;
|
||||
done = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// check that the callback was called and we got an error
|
||||
ASSERT_TIMELY (10s, done == true);
|
||||
ASSERT_TRUE (ec);
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in));
|
||||
|
||||
// check that the socket was closed due to tcp_io_timeout timeout
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out));
|
||||
}
|
||||
|
||||
TEST (socket_timeout, read_overlapped)
|
||||
{
|
||||
// create one node and set timeout to 1 second
|
||||
nano::system system (1);
|
||||
std::shared_ptr<nano::node> node = system.nodes[0];
|
||||
node->config.tcp_io_timeout = std::chrono::seconds (2);
|
||||
|
||||
// create a server socket
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), nano::get_available_port ());
|
||||
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
|
||||
acceptor.open (endpoint.protocol ());
|
||||
acceptor.bind (endpoint);
|
||||
acceptor.listen (boost::asio::socket_base::max_listen_connections);
|
||||
|
||||
// asynchronously accept an incoming connection and send one byte only
|
||||
boost::asio::ip::tcp::socket newsock (system.io_ctx);
|
||||
acceptor.async_accept (newsock, [&newsock] (boost::system::error_code const & ec_a) {
|
||||
debug_assert (!ec_a);
|
||||
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
|
||||
nano::async_write (newsock, nano::shared_const_buffer (buffer), [] (boost::system::error_code const & ec_a, size_t size_a) {
|
||||
debug_assert (!ec_a);
|
||||
debug_assert (size_a == 1);
|
||||
});
|
||||
});
|
||||
|
||||
// create a client socket to connect and call async_read twice, the second call should time out
|
||||
auto socket = std::make_shared<nano::client_socket> (*node);
|
||||
std::atomic<bool> done = false;
|
||||
boost::system::error_code ec;
|
||||
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
|
||||
debug_assert (!ec_a);
|
||||
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
|
||||
|
||||
socket->async_read (buffer, 1, [] (boost::system::error_code const & ec_a, size_t size_a) {
|
||||
debug_assert (size_a == 1);
|
||||
});
|
||||
|
||||
socket->async_read (buffer, 1, [&ec, &done] (boost::system::error_code const & ec_a, size_t size_a) {
|
||||
debug_assert (size_a == 0);
|
||||
if (ec_a)
|
||||
{
|
||||
ec = ec_a;
|
||||
done = true;
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// check that the callback was called and we got an error
|
||||
ASSERT_TIMELY (10s, done == true);
|
||||
ASSERT_TRUE (ec);
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in));
|
||||
|
||||
// check that the socket was closed due to tcp_io_timeout timeout
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out));
|
||||
}
|
||||
|
||||
TEST (socket_timeout, write_overlapped)
|
||||
{
|
||||
// create one node and set timeout to 1 second
|
||||
nano::system system (1);
|
||||
std::shared_ptr<nano::node> node = system.nodes[0];
|
||||
node->config.tcp_io_timeout = std::chrono::seconds (2);
|
||||
|
||||
// create a server socket
|
||||
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), nano::get_available_port ());
|
||||
boost::asio::ip::tcp::acceptor acceptor (system.io_ctx);
|
||||
acceptor.open (endpoint.protocol ());
|
||||
acceptor.bind (endpoint);
|
||||
acceptor.listen (boost::asio::socket_base::max_listen_connections);
|
||||
|
||||
// asynchronously accept an incoming connection and read 2 bytes only
|
||||
boost::asio::ip::tcp::socket newsock (system.io_ctx);
|
||||
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
|
||||
acceptor.async_accept (newsock, [&newsock, &buffer] (boost::system::error_code const & ec_a) {
|
||||
debug_assert (!ec_a);
|
||||
boost::asio::async_read (newsock, boost::asio::buffer (buffer->data (), buffer->size ()), [] (boost::system::error_code const & ec_a, size_t size_a) {
|
||||
debug_assert (size_a == 1);
|
||||
});
|
||||
});
|
||||
|
||||
// create a client socket and send lots of data to fill the socket queue on the local and remote side
|
||||
// eventually, the all tcp queues should fill up and async_write will not be able to progress
|
||||
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
|
||||
auto socket = std::make_shared<nano::client_socket> (*node);
|
||||
std::atomic<bool> done = false;
|
||||
boost::system::error_code ec;
|
||||
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
|
||||
debug_assert (!ec_a);
|
||||
auto buffer1 = std::make_shared<std::vector<uint8_t>> (1);
|
||||
auto buffer2 = std::make_shared<std::vector<uint8_t>> (128 * 1024);
|
||||
socket->async_write (nano::shared_const_buffer{ buffer1 }, [] (boost::system::error_code const & ec_a, size_t size_a) {
|
||||
debug_assert (size_a == 1);
|
||||
});
|
||||
for (auto i = 0; i < 1024; ++i)
|
||||
{
|
||||
socket->async_write (nano::shared_const_buffer{ buffer2 }, [&ec, &done] (boost::system::error_code const & ec_a, size_t size_a) {
|
||||
if (ec_a)
|
||||
{
|
||||
ec = ec_a;
|
||||
done = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// check that the callback was called and we got an error
|
||||
ASSERT_TIMELY (10s, done == true);
|
||||
ASSERT_TRUE (ec);
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in));
|
||||
|
||||
// check that the socket was closed due to tcp_io_timeout timeout
|
||||
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out));
|
||||
}
|
||||
|
|
|
@ -155,7 +155,6 @@ public:
|
|||
: 47000;
|
||||
request_interval_ms = is_dev_network () ? 20 : 500;
|
||||
cleanup_period = is_dev_network () ? std::chrono::seconds (1) : std::chrono::seconds (60);
|
||||
socket_dev_idle_timeout = std::chrono::seconds (2);
|
||||
idle_timeout = is_dev_network () ? cleanup_period * 15 : cleanup_period * 2;
|
||||
silent_connection_tolerance_time = std::chrono::seconds (120);
|
||||
syn_cookie_cutoff = std::chrono::seconds (5);
|
||||
|
@ -190,7 +189,6 @@ public:
|
|||
return cleanup_period * 5;
|
||||
}
|
||||
/** Default maximum idle time for a socket before it's automatically closed */
|
||||
std::chrono::seconds socket_dev_idle_timeout;
|
||||
std::chrono::seconds idle_timeout;
|
||||
std::chrono::seconds silent_connection_tolerance_time;
|
||||
std::chrono::seconds syn_cookie_cutoff;
|
||||
|
|
|
@ -97,7 +97,7 @@ void nano::bootstrap_connections::pool_connection (std::shared_ptr<nano::bootstr
|
|||
auto const & socket_l = client_a->socket;
|
||||
if (!stopped && !client_a->pending_stop && !node.network.excluded_peers.check (client_a->channel->get_tcp_endpoint ()))
|
||||
{
|
||||
socket_l->start_timer (node.network_params.network.idle_timeout);
|
||||
socket_l->set_timeout (node.network_params.network.idle_timeout);
|
||||
// Push into idle deque
|
||||
if (!push_front)
|
||||
{
|
||||
|
|
|
@ -179,7 +179,7 @@ void nano::bootstrap_server::stop ()
|
|||
void nano::bootstrap_server::receive ()
|
||||
{
|
||||
// Increase timeout to receive TCP header (idle server socket)
|
||||
socket->timeout_set (node->network_params.network.idle_timeout);
|
||||
socket->set_default_timeout_value (node->network_params.network.idle_timeout);
|
||||
auto this_l (shared_from_this ());
|
||||
socket->async_read (receive_buffer, 8, [this_l] (boost::system::error_code const & ec, std::size_t size_a) {
|
||||
// Set remote_endpoint
|
||||
|
@ -188,7 +188,7 @@ void nano::bootstrap_server::receive ()
|
|||
this_l->remote_endpoint = this_l->socket->remote_endpoint ();
|
||||
}
|
||||
// Decrease timeout to default
|
||||
this_l->socket->timeout_set (this_l->node->config.tcp_io_timeout);
|
||||
this_l->socket->set_default_timeout_value (this_l->node->config.tcp_io_timeout);
|
||||
// Receive header
|
||||
this_l->receive_header_action (ec, size_a);
|
||||
});
|
||||
|
|
|
@ -40,10 +40,10 @@ nano::socket::socket (nano::node & node_a, endpoint_type_t endpoint_type_a) :
|
|||
tcp_socket{ node_a.io_ctx },
|
||||
node{ node_a },
|
||||
endpoint_type_m{ endpoint_type_a },
|
||||
next_deadline{ std::numeric_limits<uint64_t>::max () },
|
||||
timeout{ std::numeric_limits<uint64_t>::max () },
|
||||
last_completion_time_or_init{ nano::seconds_since_epoch () },
|
||||
last_receive_time_or_init{ nano::seconds_since_epoch () },
|
||||
io_timeout{ node_a.config.tcp_io_timeout },
|
||||
default_timeout{ node_a.config.tcp_io_timeout },
|
||||
silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time }
|
||||
{
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::fu
|
|||
debug_assert (endpoint_type () == endpoint_type_t::client);
|
||||
checkup ();
|
||||
auto this_l (shared_from_this ());
|
||||
start_timer ();
|
||||
set_default_timeout ();
|
||||
this_l->tcp_socket.async_connect (endpoint_a,
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
[this_l, callback = std::move (callback_a), endpoint_a] (boost::system::error_code const & ec) {
|
||||
|
@ -68,7 +68,7 @@ void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::fu
|
|||
}
|
||||
else
|
||||
{
|
||||
this_l->stop_timer ();
|
||||
this_l->set_last_completion ();
|
||||
}
|
||||
this_l->remote = endpoint_a;
|
||||
callback (ec);
|
||||
|
@ -82,7 +82,7 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> const & buf
|
|||
auto this_l (shared_from_this ());
|
||||
if (!closed)
|
||||
{
|
||||
start_timer ();
|
||||
set_default_timeout ();
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback = std::move (callback_a), size_a, this_l] () mutable {
|
||||
boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a),
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
|
@ -94,8 +94,8 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> const & buf
|
|||
else
|
||||
{
|
||||
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
|
||||
this_l->stop_timer ();
|
||||
this_l->update_last_receive_time ();
|
||||
this_l->set_last_completion ();
|
||||
this_l->set_last_receive_time ();
|
||||
}
|
||||
cbk (ec, size_a);
|
||||
}));
|
||||
|
@ -112,65 +112,79 @@ void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> const & buf
|
|||
|
||||
void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
|
||||
{
|
||||
if (!closed)
|
||||
if (closed)
|
||||
{
|
||||
++queue_size;
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback = std::move (callback_a), this_l = shared_from_this ()] () mutable {
|
||||
if (!this_l->closed)
|
||||
{
|
||||
this_l->start_timer ();
|
||||
nano::async_write (this_l->tcp_socket, buffer_a,
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
[buffer_a, cbk = std::move (callback), this_l] (boost::system::error_code ec, std::size_t size_a) {
|
||||
--this_l->queue_size;
|
||||
if (ec)
|
||||
{
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
|
||||
}
|
||||
else
|
||||
{
|
||||
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
|
||||
this_l->stop_timer ();
|
||||
}
|
||||
if (cbk)
|
||||
{
|
||||
cbk (ec, size_a);
|
||||
}
|
||||
}));
|
||||
}
|
||||
else if (callback)
|
||||
{
|
||||
if (callback_a)
|
||||
{
|
||||
node.background ([callback = std::move (callback_a)] () {
|
||||
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||
}
|
||||
}));
|
||||
});
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
node.background ([callback = std::move (callback_a)] () {
|
||||
|
||||
++queue_size;
|
||||
|
||||
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback = std::move (callback_a), this_l = shared_from_this ()] () mutable {
|
||||
if (this_l->closed)
|
||||
{
|
||||
if (callback)
|
||||
{
|
||||
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
this_l->set_default_timeout ();
|
||||
|
||||
nano::async_write (this_l->tcp_socket, buffer_a,
|
||||
boost::asio::bind_executor (this_l->strand,
|
||||
[buffer_a, cbk = std::move (callback), this_l] (boost::system::error_code ec, std::size_t size_a) {
|
||||
--this_l->queue_size;
|
||||
|
||||
if (ec)
|
||||
{
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
|
||||
}
|
||||
else
|
||||
{
|
||||
this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
|
||||
this_l->set_last_completion ();
|
||||
}
|
||||
|
||||
if (cbk)
|
||||
{
|
||||
cbk (ec, size_a);
|
||||
}
|
||||
}));
|
||||
}));
|
||||
}
|
||||
|
||||
void nano::socket::start_timer ()
|
||||
/** Call set_timeout with default_timeout as parameter */
|
||||
void nano::socket::set_default_timeout ()
|
||||
{
|
||||
start_timer (io_timeout);
|
||||
set_timeout (default_timeout);
|
||||
}
|
||||
|
||||
void nano::socket::start_timer (std::chrono::seconds deadline_a)
|
||||
/** Set the current timeout of the socket in seconds
|
||||
* timeout occurs when the last socket completion is more than timeout seconds in the past
|
||||
* timeout always applies, the socket always has a timeout
|
||||
* to set infinite timeout, use std::numeric_limits<uint64_t>::max ()
|
||||
* the function checkup() checks for timeout on a regular interval
|
||||
*/
|
||||
void nano::socket::set_timeout (std::chrono::seconds timeout_a)
|
||||
{
|
||||
next_deadline = deadline_a.count ();
|
||||
timeout = timeout_a.count ();
|
||||
}
|
||||
|
||||
void nano::socket::stop_timer ()
|
||||
void nano::socket::set_last_completion ()
|
||||
{
|
||||
last_completion_time_or_init = nano::seconds_since_epoch ();
|
||||
}
|
||||
|
||||
void nano::socket::update_last_receive_time ()
|
||||
void nano::socket::set_last_receive_time ()
|
||||
{
|
||||
last_receive_time_or_init = nano::seconds_since_epoch ();
|
||||
}
|
||||
|
@ -183,17 +197,22 @@ void nano::socket::checkup ()
|
|||
{
|
||||
uint64_t now (nano::seconds_since_epoch ());
|
||||
auto condition_to_disconnect{ false };
|
||||
|
||||
// if this is a server socket, and no data is received for silent_connection_tolerance_time seconds then disconnect
|
||||
if (this_l->endpoint_type () == endpoint_type_t::server && (now - this_l->last_receive_time_or_init) > this_l->silent_connection_tolerance_time.count ())
|
||||
{
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in);
|
||||
condition_to_disconnect = true;
|
||||
}
|
||||
if (this_l->next_deadline != std::numeric_limits<uint64_t>::max () && (now - this_l->last_completion_time_or_init) > this_l->next_deadline)
|
||||
|
||||
// if there is no activity for timeout seconds then disconnect
|
||||
if ((now - this_l->last_completion_time_or_init) > this_l->timeout)
|
||||
{
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop,
|
||||
this_l->endpoint_type () == endpoint_type_t::server ? nano::stat::dir::in : nano::stat::dir::out);
|
||||
condition_to_disconnect = true;
|
||||
}
|
||||
|
||||
if (condition_to_disconnect)
|
||||
{
|
||||
if (this_l->node.config.logging.network_timeout_logging ())
|
||||
|
@ -222,9 +241,9 @@ bool nano::socket::has_timed_out () const
|
|||
return timed_out;
|
||||
}
|
||||
|
||||
void nano::socket::timeout_set (std::chrono::seconds io_timeout_a)
|
||||
void nano::socket::set_default_timeout_value (std::chrono::seconds timeout_a)
|
||||
{
|
||||
io_timeout = io_timeout_a;
|
||||
default_timeout = timeout_a;
|
||||
}
|
||||
|
||||
void nano::socket::set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a)
|
||||
|
@ -248,7 +267,7 @@ void nano::socket::close_internal ()
|
|||
{
|
||||
if (!closed.exchange (true))
|
||||
{
|
||||
io_timeout = std::chrono::seconds (0);
|
||||
default_timeout = std::chrono::seconds (0);
|
||||
boost::system::error_code ec;
|
||||
|
||||
// Ignore error code for shutdown as it is best-effort
|
||||
|
@ -278,7 +297,7 @@ nano::server_socket::server_socket (nano::node & node_a, boost::asio::ip::tcp::e
|
|||
local{ std::move (local_a) },
|
||||
max_inbound_connections{ max_connections_a }
|
||||
{
|
||||
io_timeout = std::chrono::seconds::max ();
|
||||
default_timeout = std::chrono::seconds::max ();
|
||||
}
|
||||
|
||||
void nano::server_socket::start (boost::system::error_code & ec_a)
|
||||
|
@ -432,7 +451,7 @@ void nano::server_socket::on_connection (std::function<bool (std::shared_ptr<nan
|
|||
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
|
||||
// an IO operation immediately, which will start a timer.
|
||||
new_connection->checkup ();
|
||||
new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? this_l->node.network_params.network.socket_dev_idle_timeout : this_l->node.network_params.network.idle_timeout);
|
||||
new_connection->set_timeout (this_l->node.network_params.network.idle_timeout);
|
||||
this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
|
||||
this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection);
|
||||
if (cbk (new_connection, ec_a))
|
||||
|
|
|
@ -12,15 +12,9 @@
|
|||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
namespace boost
|
||||
namespace boost::asio::ip
|
||||
{
|
||||
namespace asio
|
||||
{
|
||||
namespace ip
|
||||
{
|
||||
class network_v6;
|
||||
}
|
||||
}
|
||||
class network_v6;
|
||||
}
|
||||
|
||||
namespace nano
|
||||
|
@ -52,11 +46,13 @@ public:
|
|||
realtime,
|
||||
realtime_response_server // special type for tcp channel response server
|
||||
};
|
||||
|
||||
enum class endpoint_type_t
|
||||
{
|
||||
server,
|
||||
client
|
||||
};
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param node Owning node
|
||||
|
@ -74,8 +70,8 @@ public:
|
|||
/** Returns true if the socket has timed out */
|
||||
bool has_timed_out () const;
|
||||
/** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */
|
||||
void timeout_set (std::chrono::seconds io_timeout_a);
|
||||
void start_timer (std::chrono::seconds deadline_a);
|
||||
void set_default_timeout_value (std::chrono::seconds);
|
||||
void set_timeout (std::chrono::seconds);
|
||||
void set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a);
|
||||
bool max () const
|
||||
{
|
||||
|
@ -122,21 +118,47 @@ protected:
|
|||
/** The other end of the connection */
|
||||
boost::asio::ip::tcp::endpoint remote;
|
||||
|
||||
std::atomic<uint64_t> next_deadline;
|
||||
/** number of seconds of inactivity that causes a socket timeout
|
||||
* activity is any successful connect, send or receive event
|
||||
*/
|
||||
std::atomic<uint64_t> timeout;
|
||||
|
||||
/** the timestamp (in seconds since epoch) of the last time there was successful activity on the socket
|
||||
* activity is any successful connect, send or receive event
|
||||
*/
|
||||
std::atomic<uint64_t> last_completion_time_or_init;
|
||||
|
||||
/** the timestamp (in seconds since epoch) of the last time there was successful receive on the socket
|
||||
* successful receive includes graceful closing of the socket by the peer (the read succeeds but returns 0 bytes)
|
||||
*/
|
||||
std::atomic<uint64_t> last_receive_time_or_init;
|
||||
|
||||
/** Flag that is set when cleanup decides to close the socket due to timeout.
|
||||
* NOTE: Currently used by bootstrap_server::timeout() but I suspect that this and bootstrap_server::timeout() are not needed.
|
||||
*/
|
||||
std::atomic<bool> timed_out{ false };
|
||||
std::atomic<std::chrono::seconds> io_timeout;
|
||||
|
||||
/** the timeout value to use when calling set_default_timeout() */
|
||||
std::atomic<std::chrono::seconds> default_timeout;
|
||||
|
||||
/** used in real time server sockets, number of seconds of no receive traffic that will cause the socket to timeout */
|
||||
std::chrono::seconds silent_connection_tolerance_time;
|
||||
|
||||
/** Tracks number of blocks queued for delivery to the local socket send buffers.
|
||||
* Under normal circumstances, this should be zero.
|
||||
* Note that this is not the number of buffers queued to the peer, it is the number of buffers
|
||||
* queued up to enter the local TCP send buffer
|
||||
* socket buffer queue -> TCP send queue -> (network) -> TCP receive queue of peer
|
||||
*/
|
||||
std::atomic<std::size_t> queue_size{ 0 };
|
||||
|
||||
/** Set by close() - completion handlers must check this. This is more reliable than checking
|
||||
error codes as the OS may have already completed the async operation. */
|
||||
std::atomic<bool> closed{ false };
|
||||
void close_internal ();
|
||||
void start_timer ();
|
||||
void stop_timer ();
|
||||
void update_last_receive_time ();
|
||||
void set_default_timeout ();
|
||||
void set_last_completion ();
|
||||
void set_last_receive_time ();
|
||||
void checkup ();
|
||||
|
||||
private:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue