Merge pull request #4929 from pwojcikdev/coroutine-socket-only-pr

Coroutine socket refactor
This commit is contained in:
Piotr Wójcik 2025-07-24 17:26:33 +02:00 committed by GitHub
commit 19b1e58d68
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 1234 additions and 1304 deletions

View file

@ -10,12 +10,6 @@ code_inspect()
return 1
fi
# This is to prevent out of scope access in async_write from asio which is not picked up by static analysers
if [[ $(grep -rl --exclude="*asio.hpp" "asio::async_write" $SOURCE_ROOT_PATH/nano) ]]; then
echo "Using boost::asio::async_write directly is not permitted (except in nano/lib/asio.hpp). Use nano::async_write instead" >&2
return 1
fi
if [[ $(grep -rlP "^\s*assert \(" $SOURCE_ROOT_PATH/nano) ]]; then
echo "Using assert is not permitted. Use debug_assert instead." >&2
return 1

View file

@ -96,8 +96,8 @@ TEST (network, send_node_id_handshake_tcp)
node0->network.tcp_channels.start_tcp (node1->network.endpoint ());
ASSERT_EQ (0, node0->network.size ());
ASSERT_EQ (0, node1->network.size ());
ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial + 2);
ASSERT_TIMELY (5s, node1->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial_node1 + 2);
ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::tcp_server_message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial + 2);
ASSERT_TIMELY (5s, node1->stats.count (nano::stat::type::tcp_server_message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial_node1 + 1);
ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) >= initial_keepalive + 2);
ASSERT_TIMELY (5s, node1->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) >= initial_keepalive + 2);
ASSERT_EQ (1, node0->network.size ());
@ -135,8 +135,8 @@ TEST (network, last_contacted)
// check that the endpoints are part of the same connection
std::shared_ptr<nano::transport::tcp_socket> sock0 = channel0->socket;
std::shared_ptr<nano::transport::tcp_socket> sock1 = channel1->socket;
ASSERT_EQ (sock0->local_endpoint (), sock1->remote_endpoint ());
ASSERT_EQ (sock1->local_endpoint (), sock0->remote_endpoint ());
ASSERT_EQ (sock0->get_local_endpoint (), sock1->get_remote_endpoint ());
ASSERT_EQ (sock1->get_local_endpoint (), sock0->get_remote_endpoint ());
}
// capture the state before and ensure the clock ticks at least once
@ -293,66 +293,24 @@ TEST (network, send_insufficient_work)
nano::test::system system (2);
auto & node1 = *system.nodes[0];
auto & node2 = *system.nodes[1];
// Block zero work
nano::block_builder builder;
auto block1 = builder
.send ()
.previous (0)
.destination (1)
.balance (20)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (0)
.build ();
nano::publish publish1{ nano::dev::network_params.network, block1 };
auto tcp_channel (node1.network.tcp_channels.find_node_id (node2.get_node_id ()));
auto block = builder
.send ()
.previous (0)
.destination (1)
.balance (20)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (0)
.build ();
auto tcp_channel = node1.network.tcp_channels.find_node_id (node2.get_node_id ());
ASSERT_NE (nullptr, tcp_channel);
tcp_channel->send (publish1, nano::transport::traffic_type::test);
ASSERT_EQ (0, node1.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 0);
ASSERT_EQ (1, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
// Legacy block work between epoch_2_recieve & epoch_1
auto block2 = builder
.send ()
.previous (block1->hash ())
.destination (1)
.balance (20)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
.build ();
nano::publish publish2{ nano::dev::network_params.network, block2 };
tcp_channel->send (publish2, nano::transport::traffic_type::test);
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 1);
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
// Legacy block work epoch_1
auto block3 = builder
.send ()
.previous (block2->hash ())
.destination (1)
.balance (20)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*system.work.generate (block2->hash (), node1.network_params.work.epoch_2))
.build ();
nano::publish publish3{ nano::dev::network_params.network, block3 };
tcp_channel->send (publish3, nano::transport::traffic_type::test);
ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
// State block work epoch_2_recieve
auto block4 = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (block1->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (20)
.link (1)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1))
.build ();
nano::publish publish4{ nano::dev::network_params.network, block4 };
tcp_channel->send (publish4, nano::transport::traffic_type::test);
ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0);
ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in));
ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work));
nano::publish publish{ nano::dev::network_params.network, block };
tcp_channel->send (publish, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (5s, 1, node2.stats.count (nano::stat::type::tcp_server_error, nano::stat::detail::insufficient_work));
}
TEST (receivable_processor, confirm_insufficient_pos)
@ -964,7 +922,7 @@ TEST (network, filter_invalid_network_bytes)
const_cast<nano::networks &> (keepalive.header.network) = nano::networks::invalid;
channel->send (keepalive, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::invalid_network));
ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::tcp_server_error, nano::stat::detail::invalid_network));
}
// Ensure the network filters messages with the incorrect minimum version
@ -983,7 +941,7 @@ TEST (network, filter_invalid_version_using)
const_cast<uint8_t &> (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1;
channel->send (keepalive, nano::transport::traffic_type::test);
ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::outdated_version));
ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::tcp_server_error, nano::stat::detail::outdated_version));
}
TEST (network, fill_keepalive_self)

View file

@ -82,10 +82,9 @@ TEST (socket, disconnection_of_silent_connections)
nano::node_config config;
// Increasing the timer timeout, so we don't let the connection to timeout due to the timer checker.
config.tcp_io_timeout = std::chrono::seconds::max ();
config.network_params.network.idle_timeout = std::chrono::seconds::max ();
config.tcp.io_timeout = std::chrono::seconds::max ();
// Silent connections are connections open by external peers that don't contribute with any data.
config.network_params.network.silent_connection_tolerance_time = std::chrono::seconds{ 5 };
config.tcp.silent_timeout = std::chrono::seconds{ 5 };
auto node = system.add_node (config);
// On a connection, a server data socket is created. The shared pointer guarantees the object's lifecycle until the end of this test.
@ -110,15 +109,16 @@ TEST (socket, disconnection_of_silent_connections)
// Checking the connection was closed.
ASSERT_TIMELY (10s, server_data_socket_future.wait_for (0s) == std::future_status::ready);
auto server_data_socket = server_data_socket_future.get ();
ASSERT_TIMELY (10s, server_data_socket->is_closed ());
ASSERT_TIMELY (10s, !server_data_socket->alive ());
// Just to ensure the disconnection wasn't due to the timer timeout.
ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in));
// Asserts the silent checker worked.
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in));
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in));
}
TEST (socket, drop_policy)
// FIXME: Socket no longer queues writes, so this test is no longer valid
TEST (socket, DISABLED_drop_policy)
{
nano::test::system system;
@ -169,18 +169,20 @@ TEST (socket, drop_policy)
ASSERT_EQ (1, client.use_count ());
};
// TODO: FIXME: Socket no longer queues writes, so this test is no longer valid
size_t constexpr queue_size = 128;
// We're going to write twice the queue size + 1, and the server isn't reading
// The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop)
func (nano::transport::tcp_socket::default_queue_size * 2 + 1);
func (queue_size * 2 + 1);
ASSERT_EQ (1, failed_writes);
func (nano::transport::tcp_socket::default_queue_size + 1);
func (queue_size + 1);
ASSERT_EQ (0, failed_writes);
}
// This is abusing the socket class, it's interfering with the normal node lifetimes and as a result deadlocks
// TEST (socket, DISABLED_concurrent_writes)
TEST (socket, concurrent_writes)
// FIXME: Socket no longer queues writes, so this test is no longer valid
TEST (socket, DISABLED_concurrent_writes)
{
nano::test::system system;
@ -231,7 +233,7 @@ TEST (socket, concurrent_writes)
accept_callback_t accept_callback = [&] (boost::system::error_code const & ec, boost::asio::ip::tcp::socket socket) {
if (!ec)
{
auto new_connection = std::make_shared<nano::transport::tcp_socket> (*node, std::move (socket), socket.remote_endpoint (), socket.local_endpoint ());
auto new_connection = std::make_shared<nano::transport::tcp_socket> (*node, std::move (socket));
connections.push_back (new_connection);
reader (new_connection);
@ -306,7 +308,7 @@ TEST (socket_timeout, connect)
// create one node and set timeout to 1 second
nano::test::system system (1);
std::shared_ptr<nano::node> node = system.nodes[0];
node->config.tcp_io_timeout = 1s;
node->config.tcp.io_timeout = 1s;
// try to connect to an IP address that most likely does not exist and will not reply
// we want the tcp stack to not receive a negative reply, we want it to see silence and to keep trying
@ -332,7 +334,7 @@ TEST (socket_timeout, read)
// create one node and set timeout to 1 second
nano::test::system system (1);
std::shared_ptr<nano::node> node = system.nodes[0];
node->config.tcp_io_timeout = std::chrono::seconds (2);
node->config.tcp.io_timeout = std::chrono::seconds (2);
// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
@ -367,30 +369,28 @@ TEST (socket_timeout, read)
// check that the callback was called and we got an error
ASSERT_TIMELY_EQ (10s, done, true);
ASSERT_TRUE (ec);
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error));
// check that the socket was closed due to tcp_io_timeout timeout
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in));
}
TEST (socket_timeout, write)
{
std::atomic<bool> done = false;
// create one node and set timeout to 1 second
nano::test::system system (1);
std::shared_ptr<nano::node> node = system.nodes[0];
node->config.tcp_io_timeout = std::chrono::seconds (2);
node->config.tcp.io_timeout = std::chrono::seconds (2);
// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (node->io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);
// asynchronously accept an incoming connection and create a newsock and do not receive any data
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
boost::asio::ip::tcp::socket newsock (node->io_ctx);
acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
});
@ -398,26 +398,28 @@ TEST (socket_timeout, write)
// create a client socket and send lots of data to fill the socket queue on the local and remote side
// eventually, the all tcp queues should fill up and async_write will not be able to progress
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
auto socket = std::make_shared<nano::transport::tcp_socket> (*node, nano::transport::socket_endpoint::client, 1024 * 1024); // socket with a max queue size much larger than OS buffers
auto socket = std::make_shared<nano::transport::tcp_socket> (*node, nano::transport::socket_endpoint::client); // socket with a max queue size much larger than OS buffers
socket->async_connect (acceptor.local_endpoint (), [&socket, &done] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
auto ec_0 = socket->blocking_connect (acceptor.local_endpoint ());
ASSERT_FALSE (ec_0);
auto buffer = std::make_shared<std::vector<uint8_t>> (128 * 1024);
for (auto i = 0; i < 1024; ++i)
auto buffer = std::make_shared<std::vector<uint8_t>> (128 * 1024);
// At some point the write should fail with a timeout
boost::system::error_code ec;
for (auto i = 0; i < 1024; ++i)
{
auto [ec_1, size_1] = socket->blocking_write (buffer, buffer->size ());
if (ec_1)
{
socket->async_write (nano::shared_const_buffer{ buffer }, [&done] (boost::system::error_code const & ec_a, size_t size_a) {
if (ec_a)
{
done = true;
}
});
ec = ec_1;
break;
}
});
}
// check that the callback was called and we got an error
ASSERT_TIMELY (10s, done);
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in));
ASSERT_TRUE (ec);
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error));
// check that the socket was closed due to tcp_io_timeout timeout
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out));
@ -428,78 +430,66 @@ TEST (socket_timeout, read_overlapped)
// create one node and set timeout to 1 second
nano::test::system system (1);
std::shared_ptr<nano::node> node = system.nodes[0];
node->config.tcp_io_timeout = std::chrono::seconds (2);
node->config.tcp.io_timeout = std::chrono::seconds (2);
// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (node->io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);
// asynchronously accept an incoming connection and send one byte only
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
boost::asio::ip::tcp::socket newsock (node->io_ctx);
acceptor.async_accept (newsock, [&newsock] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
nano::async_write (newsock, nano::shared_const_buffer (buffer), [] (boost::system::error_code const & ec_a, size_t size_a) {
EXPECT_TRUE (!ec_a);
EXPECT_TRUE (size_a == 1);
EXPECT_EQ (size_a, 1);
});
});
// create a client socket to connect and call async_read twice, the second call should time out
auto socket = std::make_shared<nano::transport::tcp_socket> (*node);
std::atomic<bool> done = false;
boost::system::error_code ec;
socket->async_connect (acceptor.local_endpoint (), [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
auto ec_0 = socket->blocking_connect (acceptor.local_endpoint ());
ASSERT_FALSE (ec_0);
socket->async_read (buffer, 1, [] (boost::system::error_code const & ec_a, size_t size_a) {
EXPECT_FALSE (ec_a);
EXPECT_TRUE (size_a == 1);
});
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
socket->async_read (buffer, 1, [&ec, &done] (boost::system::error_code const & ec_a, size_t size_a) {
EXPECT_EQ (size_a, 0);
if (ec_a)
{
ec = ec_a;
done = true;
}
});
});
auto [ec_1, size_1] = socket->blocking_read (buffer, 1);
ASSERT_FALSE (ec_1);
ASSERT_TRUE (size_1 == 1);
auto [ec_2, size_2] = socket->blocking_read (buffer, 1);
ASSERT_EQ (size_2, 0);
ASSERT_TRUE (ec_2);
// check that the callback was called and we got an error
ASSERT_TIMELY_EQ (10s, done, true);
ASSERT_TRUE (ec);
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in));
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error));
// check that the socket was closed due to tcp_io_timeout timeout
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out));
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in));
}
TEST (socket_timeout, write_overlapped)
{
std::atomic<bool> done = false;
// create one node and set timeout to 1 second
nano::test::system system (1);
std::shared_ptr<nano::node> node = system.nodes[0];
node->config.tcp_io_timeout = std::chrono::seconds (2);
node->config.tcp.io_timeout = std::chrono::seconds (2);
// create a server socket
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ());
boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx);
boost::asio::ip::tcp::acceptor acceptor (node->io_ctx);
acceptor.open (endpoint.protocol ());
acceptor.bind (endpoint);
acceptor.listen (boost::asio::socket_base::max_listen_connections);
// asynchronously accept an incoming connection and read 2 bytes only
boost::asio::ip::tcp::socket newsock (*system.io_ctx);
boost::asio::ip::tcp::socket newsock (node->io_ctx);
auto buffer = std::make_shared<std::vector<uint8_t>> (1);
acceptor.async_accept (newsock, [&newsock, &buffer] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
@ -513,30 +503,33 @@ TEST (socket_timeout, write_overlapped)
// create a client socket and send lots of data to fill the socket queue on the local and remote side
// eventually, the all tcp queues should fill up and async_write will not be able to progress
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
auto socket = std::make_shared<nano::transport::tcp_socket> (*node, nano::transport::socket_endpoint::client, 1024 * 1024); // socket with a max queue size much larger than OS buffers
socket->async_connect (acceptor.local_endpoint (), [&socket, &done] (boost::system::error_code const & ec_a) {
EXPECT_FALSE (ec_a);
auto socket = std::make_shared<nano::transport::tcp_socket> (*node, nano::transport::socket_endpoint::client);
auto buffer1 = std::make_shared<std::vector<uint8_t>> (1);
auto buffer2 = std::make_shared<std::vector<uint8_t>> (128 * 1024);
socket->async_write (nano::shared_const_buffer{ buffer1 }, [] (boost::system::error_code const & ec_a, size_t size_a) {
EXPECT_FALSE (ec_a);
EXPECT_EQ (size_a, 1);
});
for (auto i = 0; i < 1024; ++i)
auto ec_0 = socket->blocking_connect (acceptor.local_endpoint ());
ASSERT_FALSE (ec_0);
auto buffer1 = std::make_shared<std::vector<uint8_t>> (1);
auto buffer2 = std::make_shared<std::vector<uint8_t>> (128 * 1024);
auto [ec_1, size_1] = socket->blocking_write (nano::shared_const_buffer{ buffer1 }, 1);
ASSERT_FALSE (ec_1);
ASSERT_EQ (size_1, 1);
// At some point the write should fail with a timeout
boost::system::error_code ec;
for (auto i = 0; i < 1000; ++i)
{
auto [ec_2, size_2] = socket->blocking_write (nano::shared_const_buffer{ buffer2 }, 128 * 1024);
if (ec_2)
{
socket->async_write (nano::shared_const_buffer{ buffer2 }, [&done] (boost::system::error_code const & ec_a, size_t size_a) {
if (ec_a)
{
done = true;
}
});
ec = ec_2;
break;
}
});
}
// check that the callback was called and we got an error
ASSERT_TIMELY_EQ (10s, done, true);
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in));
ASSERT_TRUE (ec);
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error));
// check that the socket was closed due to tcp_io_timeout timeout
ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out));

View file

@ -62,8 +62,6 @@ TEST (telemetry, basic)
auto node_client = system.add_node (node_flags);
auto node_server = system.add_node (node_flags);
nano::test::wait_peer_connections (system);
// Request telemetry metrics
auto channel = node_client->network.find_node_id (node_server->get_node_id ());
ASSERT_NE (nullptr, channel);
@ -116,7 +114,7 @@ TEST (telemetry, disconnected)
nano::node_flags node_flags;
auto node_client = system.add_node (node_flags);
auto node_server = system.add_node (node_flags);
nano::test::wait_peer_connections (system);
auto channel = node_client->network.find_node_id (node_server->get_node_id ());
ASSERT_NE (nullptr, channel);
@ -138,18 +136,10 @@ TEST (telemetry, dos_tcp)
auto node_client = system.add_node (node_flags);
auto node_server = system.add_node (node_flags);
nano::test::wait_peer_connections (system);
nano::telemetry_req message{ nano::dev::network_params.network };
auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ());
ASSERT_NE (nullptr, channel);
channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
});
ASSERT_TIMELY_EQ (5s, 1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
auto orig = std::chrono::steady_clock::now ();
nano::telemetry_req message{ nano::dev::network_params.network };
for (int i = 0; i < 10; ++i)
{
channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) {
@ -157,17 +147,11 @@ TEST (telemetry, dos_tcp)
});
}
ASSERT_TIMELY (5s, (nano::dev::network_params.network.telemetry_request_cooldown + orig) <= std::chrono::steady_clock::now ());
// Should process telemetry_req messages
ASSERT_TIMELY (5s, 1 < node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
// Should process no more telemetry_req messages
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
// Now spam messages waiting for it to be processed
while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1)
{
channel->send (message, nano::transport::traffic_type::test);
ASSERT_NO_ERROR (system.poll ());
}
// But not respond to all of them (by default there are 2 broadcasts per second in dev mode)
ASSERT_ALWAYS (1s, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::out) < 7);
}
TEST (telemetry, disable_metrics)
@ -178,8 +162,6 @@ TEST (telemetry, disable_metrics)
node_flags.disable_providing_telemetry_metrics = true;
auto node_server = system.add_node (node_flags);
nano::test::wait_peer_connections (system);
// Try and request metrics from a node which is turned off but a channel is not closed yet
auto channel = node_client->network.find_node_id (node_server->get_node_id ());
ASSERT_NE (nullptr, channel);
@ -210,7 +192,6 @@ TEST (telemetry, max_possible_size)
data.unknown_data.resize (nano::message_header::telemetry_size_mask.to_ulong () - nano::telemetry_data::latest_size);
nano::telemetry_ack message{ nano::dev::network_params.network, data };
nano::test::wait_peer_connections (system);
auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ());
ASSERT_NE (nullptr, channel);
@ -231,8 +212,6 @@ TEST (telemetry, maker_pruning)
config.enable_voting = false;
auto node_server = system.add_node (config, node_flags);
nano::test::wait_peer_connections (system);
// Request telemetry metrics
auto channel = node_client->network.find_node_id (node_server->get_node_id ());
ASSERT_NE (nullptr, channel);

View file

@ -306,7 +306,6 @@ TEST (toml_config, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.preconfigured_representatives, defaults.node.preconfigured_representatives);
ASSERT_EQ (conf.node.receive_minimum, defaults.node.receive_minimum);
ASSERT_EQ (conf.node.signature_checker_threads, defaults.node.signature_checker_threads);
ASSERT_EQ (conf.node.tcp_io_timeout, defaults.node.tcp_io_timeout);
ASSERT_EQ (conf.node.unchecked_cutoff_time, defaults.node.unchecked_cutoff_time);
ASSERT_EQ (conf.node.use_memory_pools, defaults.node.use_memory_pools);
ASSERT_EQ (conf.node.vote_generator_delay, defaults.node.vote_generator_delay);
@ -472,7 +471,6 @@ TEST (toml_config, daemon_config_deserialize_no_defaults)
preconfigured_representatives = ["nano_3arg3asgtigae3xckabaaewkx3bzsh7nwz7jkmjos79ihyaxwphhm6qgjps4"]
receive_minimum = "999"
signature_checker_threads = 999
tcp_io_timeout = 999
unchecked_cutoff_time = 999
use_memory_pools = false
vote_generator_delay = 999
@ -740,7 +738,6 @@ TEST (toml_config, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.preconfigured_representatives, defaults.node.preconfigured_representatives);
ASSERT_NE (conf.node.receive_minimum, defaults.node.receive_minimum);
ASSERT_NE (conf.node.signature_checker_threads, defaults.node.signature_checker_threads);
ASSERT_NE (conf.node.tcp_io_timeout, defaults.node.tcp_io_timeout);
ASSERT_NE (conf.node.unchecked_cutoff_time, defaults.node.unchecked_cutoff_time);
ASSERT_NE (conf.node.use_memory_pools, defaults.node.use_memory_pools);
ASSERT_NE (conf.node.vote_generator_delay, defaults.node.vote_generator_delay);

View file

@ -1047,8 +1047,6 @@ TEST (websocket, telemetry)
config.websocket_config.port = system.get_available_port ();
auto node2 (system.add_node (config, node_flags));
nano::test::wait_peer_connections (system);
std::atomic<bool> done{ false };
auto task = ([config = node1->config, &node1, &done] () {
fake_websocket_client client (node1->websocket.server->listening_port ());

View file

@ -1,9 +1,16 @@
#pragma once
#include <nano/boost/asio/ip/tcp.hpp>
#include <nano/boost/asio/strand.hpp>
#include <nano/boost/asio/write.hpp>
#include <span>
namespace nano
{
using buffer_view = std::span<const uint8_t>;
using shared_buffer = std::shared_ptr<std::vector<uint8_t>>;
class shared_const_buffer
{
public:
@ -22,6 +29,11 @@ public:
std::size_t size () const;
std::vector<uint8_t> to_bytes () const;
operator nano::shared_buffer () const
{
return m_data;
}
private:
std::shared_ptr<std::vector<uint8_t>> m_data;
boost::asio::const_buffer m_buffer;

View file

@ -1,16 +1,10 @@
#pragma once
namespace boost::asio::ip
{
class address;
class tcp;
template <typename InternetProtocol>
class basic_endpoint;
}
#include <nano/boost/asio/ip/tcp.hpp>
namespace nano
{
using ip_address = boost::asio::ip::address;
using endpoint = boost::asio::ip::basic_endpoint<boost::asio::ip::tcp>;
using tcp_endpoint = endpoint;
using tcp_endpoint = endpoint; // TODO: Remove this alias
}

View file

@ -35,7 +35,12 @@ enum class type
http_callbacks_ec,
ipc,
tcp,
tcp_socket,
tcp_socket_timeout,
tcp_server,
tcp_server_message,
tcp_server_read,
tcp_server_error,
tcp_channel,
tcp_channel_queued,
tcp_channel_send,
@ -180,6 +185,8 @@ enum class detail
refresh,
sent,
reset,
close,
read,
// processing queue
queue,
@ -236,9 +243,10 @@ enum class detail
forced,
election,
// message specific
// message types
not_a_type,
invalid,
header,
keepalive,
publish,
confirm_req,
@ -361,10 +369,19 @@ enum class detail
// tcp
tcp_silent_connection_drop,
tcp_io_timeout_drop,
tcp_connect_success,
tcp_connect_error,
tcp_read_error,
tcp_write_error,
// tcp_socket
unhealthy,
already_closed,
timeout_receive,
timeout_send,
timeout_connect,
timeout_silence,
// tcp_listener
accept_success,
accept_error,
@ -398,6 +415,8 @@ enum class detail
outdated,
// tcp_server
read_header,
read_payload,
handshake,
handshake_abort,
handshake_error,
@ -405,6 +424,10 @@ enum class detail
handshake_initiate,
handshake_response,
handshake_response_invalid,
handshake_failed,
message_queued,
message_dropped,
message_ignored,
// ipc
invocations,

View file

@ -185,6 +185,7 @@ public:
nano::endpoint peering_endpoint{ channel->get_remote_endpoint ().address (), self_report.port () };
channel->set_peering_endpoint (peering_endpoint);
}
channel->set_last_keepalive (message);
}
void publish (nano::publish const & message) override

View file

@ -1978,6 +1978,185 @@ void nano::asc_pull_ack::frontiers_payload::operator() (nano::object_stream & ob
});
}
/*
*
*/
auto nano::deserialize_message (
nano::buffer_view buffer,
nano::message_header const & header,
nano::network_constants const & network_constants,
nano::network_filter * network_filter,
nano::block_uniquer * block_uniquer,
nano::vote_uniquer * vote_uniquer)
-> deserialize_message_result
{
nano::bufferstream stream{ buffer.data (), buffer.size () };
switch (header.type)
{
case nano::message_type::keepalive:
{
bool error = false;
auto message = std::make_unique<nano::keepalive> (error, stream, header);
if (!error && at_end (stream))
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_keepalive_message };
}
break;
case nano::message_type::publish:
{
nano::uint128_t digest{ 0 };
if (network_filter)
{
if (network_filter->apply (buffer.data (), buffer.size (), &digest))
{
return { nullptr, deserialize_message_status::duplicate_publish_message };
}
}
bool error = false;
auto message = std::make_unique<nano::publish> (error, stream, header, digest, block_uniquer);
if (!error && at_end (stream) || !message->block)
{
if (!network_constants.work.validate_entry (*message->block))
{
return { std::move (message), deserialize_message_status::success };
}
else
{
return { nullptr, deserialize_message_status::insufficient_work };
}
}
return { nullptr, deserialize_message_status::invalid_publish_message };
}
break;
case nano::message_type::confirm_req:
{
bool error = false;
auto message = std::make_unique<nano::confirm_req> (error, stream, header);
if (!error && at_end (stream))
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_confirm_req_message };
}
break;
case nano::message_type::confirm_ack:
{
nano::uint128_t digest{ 0 };
if (network_filter)
{
if (network_filter->apply (buffer.data (), buffer.size (), &digest))
{
return { nullptr, deserialize_message_status::duplicate_confirm_ack_message };
}
}
bool error = false;
auto message = std::make_unique<nano::confirm_ack> (error, stream, header, digest, vote_uniquer);
if (!error && at_end (stream))
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_confirm_ack_message };
}
break;
case nano::message_type::node_id_handshake:
{
bool error = false;
auto message = std::make_unique<nano::node_id_handshake> (error, stream, header);
if (!error && at_end (stream))
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_node_id_handshake_message };
}
break;
case nano::message_type::telemetry_req:
{
return { std::make_unique<nano::telemetry_req> (header), deserialize_message_status::success };
}
break;
case nano::message_type::telemetry_ack:
{
bool error = false;
auto message = std::make_unique<nano::telemetry_ack> (error, stream, header);
if (!error) // Intentionally not checking at_end here for forward compatibility
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_telemetry_ack_message };
}
break;
case nano::message_type::bulk_pull:
{
bool error = false;
auto message = std::make_unique<nano::bulk_pull> (error, stream, header);
if (!error && at_end (stream))
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_bulk_pull_message };
}
break;
case nano::message_type::bulk_pull_account:
{
bool error = false;
auto message = std::make_unique<nano::bulk_pull_account> (error, stream, header);
if (!error && at_end (stream))
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_bulk_pull_account_message };
}
break;
case nano::message_type::bulk_push:
{
return { std::make_unique<nano::bulk_push> (header), deserialize_message_status::success };
}
break;
case nano::message_type::frontier_req:
{
bool error = false;
auto message = std::make_unique<nano::frontier_req> (error, stream, header);
if (!error && at_end (stream))
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_frontier_req_message };
}
break;
case nano::message_type::asc_pull_req:
{
bool error = false;
auto message = std::make_unique<nano::asc_pull_req> (error, stream, header);
if (!error)
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_asc_pull_req_message };
}
break;
case nano::message_type::asc_pull_ack:
{
bool error = false;
auto message = std::make_unique<nano::asc_pull_ack> (error, stream, header);
if (!error)
{
return { std::move (message), deserialize_message_status::success };
}
return { nullptr, deserialize_message_status::invalid_asc_pull_ack_message };
}
break;
default:
return { nullptr, deserialize_message_status::invalid_message_type };
}
release_assert (false, "invalid message type");
}
/*
*
*/
@ -1996,3 +2175,13 @@ nano::log::detail nano::to_log_detail (nano::message_type type)
{
return nano::enum_util::cast<nano::log::detail> (type);
}
nano::stat::detail nano::to_stat_detail (nano::deserialize_message_status status)
{
return nano::enum_util::cast<nano::stat::detail> (status);
}
std::string_view nano::to_string (nano::deserialize_message_status status)
{
return nano::enum_util::name (status);
}

View file

@ -810,4 +810,41 @@ public:
}
virtual void default_handler (nano::message const &){};
};
enum class deserialize_message_status
{
success,
insufficient_work,
invalid_header,
invalid_message_type,
invalid_keepalive_message,
invalid_publish_message,
invalid_confirm_req_message,
invalid_confirm_ack_message,
invalid_node_id_handshake_message,
invalid_telemetry_req_message,
invalid_telemetry_ack_message,
invalid_bulk_pull_message,
invalid_bulk_pull_account_message,
invalid_frontier_req_message,
invalid_asc_pull_req_message,
invalid_asc_pull_ack_message,
invalid_network,
outdated_version,
duplicate_publish_message,
duplicate_confirm_ack_message,
};
nano::stat::detail to_stat_detail (deserialize_message_status);
std::string_view to_string (deserialize_message_status);
using deserialize_message_result = std::tuple<std::unique_ptr<nano::message>, nano::deserialize_message_status>;
deserialize_message_result deserialize_message (
nano::buffer_view buffer,
nano::message_header const & header,
nano::network_constants const &,
nano::network_filter * = nullptr,
nano::block_uniquer * = nullptr,
nano::vote_uniquer * = nullptr);
}

View file

@ -640,7 +640,7 @@ void nano::node::stop ()
// work pool is not stopped on purpose due to testing setup
// Stop the IO runner last
runner.abort ();
runner.abort (); // TODO: Remove this
runner.join ();
debug_assert (io_ctx_shared.use_count () == 1); // Node should be the last user of the io_context
}

View file

@ -121,7 +121,6 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
toml.put ("vote_minimum", vote_minimum.to_string_dec (), "Local representatives do not vote if the delegated weight is under this threshold. Saves on system resources.\ntype:string,amount,raw");
toml.put ("vote_generator_delay", vote_generator_delay.count (), "Delay before votes are sent to allow for efficient bundling of hashes in votes.\ntype:milliseconds");
toml.put ("unchecked_cutoff_time", unchecked_cutoff_time.count (), "Number of seconds before deleting an unchecked entry.\nWarning: lower values (e.g., 3600 seconds, or 1 hour) may result in unsuccessful bootstraps, especially a bootstrap from scratch.\ntype:seconds");
toml.put ("tcp_io_timeout", tcp_io_timeout.count (), "Timeout for TCP connect-, read- and write operations.\nWarning: a low value (e.g., below 5 seconds) may result in TCP connections failing.\ntype:seconds");
toml.put ("pow_sleep_interval", pow_sleep_interval.count (), "Time to sleep between batch work generation attempts. Reduces max CPU usage at the expense of a longer generation time.\ntype:nanoseconds");
toml.put ("external_address", external_address, "The external address of this node (NAT). If not set, the node will request this information via UPnP.\ntype:string,ip");
toml.put ("external_port", external_port, "The external port number of this node (NAT). Only used if external_address is set.\ntype:uint16");
@ -575,10 +574,6 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
toml.get ("unchecked_cutoff_time", unchecked_cutoff_time_l);
unchecked_cutoff_time = std::chrono::seconds (unchecked_cutoff_time_l);
auto tcp_io_timeout_l = static_cast<unsigned long> (tcp_io_timeout.count ());
toml.get ("tcp_io_timeout", tcp_io_timeout_l);
tcp_io_timeout = std::chrono::seconds (tcp_io_timeout_l);
if (toml.has_key ("peering_port"))
{
std::uint16_t peering_port_l{};

View file

@ -117,8 +117,6 @@ public:
uint16_t external_port{ 0 };
std::chrono::milliseconds block_processor_batch_max_time{ std::chrono::milliseconds (500) };
std::chrono::seconds unchecked_cutoff_time{ std::chrono::seconds (4 * 60 * 60) }; // 4 hours
/** Timeout for initiated async operations */
std::chrono::seconds tcp_io_timeout{ (network_params.network.is_dev_network () && !is_sanitizer_build ()) ? std::chrono::seconds (5) : std::chrono::seconds (15) };
std::chrono::nanoseconds pow_sleep_interval{ 0 };
bool use_memory_pools{ true };

View file

@ -39,6 +39,20 @@ nano::endpoint nano::transport::channel::get_peering_endpoint () const
return get_remote_endpoint ();
}
void nano::transport::channel::set_last_keepalive (nano::keepalive const & message)
{
nano::lock_guard<nano::mutex> lock{ mutex };
last_keepalive = message;
}
std::optional<nano::keepalive> nano::transport::channel::pop_last_keepalive ()
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto result = last_keepalive;
last_keepalive.reset ();
return result;
}
std::shared_ptr<nano::node> nano::transport::channel::owner () const
{
return node.shared ();

View file

@ -55,7 +55,6 @@ public:
nano::lock_guard<nano::mutex> lock{ mutex };
return last_bootstrap_attempt;
}
void set_last_bootstrap_attempt (std::chrono::steady_clock::time_point const time_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
@ -67,7 +66,6 @@ public:
nano::lock_guard<nano::mutex> lock{ mutex };
return last_packet_received;
}
void set_last_packet_received (std::chrono::steady_clock::time_point const time_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
@ -79,7 +77,6 @@ public:
nano::lock_guard<nano::mutex> lock{ mutex };
return last_packet_sent;
}
void set_last_packet_sent (std::chrono::steady_clock::time_point const time_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
@ -91,13 +88,11 @@ public:
nano::lock_guard<nano::mutex> lock{ mutex };
return node_id;
}
nano::account get_node_id () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return node_id.value_or (0);
}
void set_node_id (nano::account node_id_a)
{
nano::lock_guard<nano::mutex> lock{ mutex };
@ -108,7 +103,6 @@ public:
{
return network_version;
}
void set_network_version (uint8_t network_version_a)
{
network_version = network_version_a;
@ -117,6 +111,9 @@ public:
nano::endpoint get_peering_endpoint () const;
void set_peering_endpoint (nano::endpoint endpoint);
void set_last_keepalive (nano::keepalive const & message);
std::optional<nano::keepalive> pop_last_keepalive ();
std::shared_ptr<nano::node> owner () const;
protected:
@ -133,6 +130,7 @@ private:
std::optional<nano::account> node_id{};
std::atomic<uint8_t> network_version{ 0 };
std::optional<nano::endpoint> peering_endpoint{};
std::optional<nano::keepalive> last_keepalive{};
public: // Logging
virtual void operator() (nano::object_stream &) const;

View file

@ -1,5 +1,7 @@
#pragma once
#include <nano/lib/common.hpp>
#include <string_view>
namespace nano::transport

View file

@ -4,6 +4,7 @@ namespace nano::transport
{
class channel;
class loopback_channel;
class message_deserializer;
class tcp_channel;
class tcp_channels;
class tcp_server;

View file

@ -16,8 +16,8 @@ nano::transport::tcp_channel::tcp_channel (nano::node & node_a, std::shared_ptr<
strand{ node_a.io_ctx.get_executor () },
sending_task{ strand }
{
remote_endpoint = socket_a->remote_endpoint ();
local_endpoint = socket_a->local_endpoint ();
remote_endpoint = socket_a->get_remote_endpoint ();
local_endpoint = socket_a->get_local_endpoint ();
start ();
}
@ -30,7 +30,11 @@ void nano::transport::tcp_channel::close ()
{
stop ();
socket->close ();
closed = true;
}
void nano::transport::tcp_channel::close_async ()
{
socket->close_async ();
}
void nano::transport::tcp_channel::start ()
@ -49,10 +53,17 @@ asio::awaitable<void> nano::transport::tcp_channel::start_sending (nano::async::
}
catch (boost::system::system_error const & ex)
{
// Operation aborted is expected when cancelling the acceptor
// Operation aborted is expected when cancelling the task
debug_assert (ex.code () == asio::error::operation_aborted);
}
catch (...)
{
release_assert (false, "unexpected exception");
}
debug_assert (strand.running_in_this_thread ());
// Ensure socket gets closed if task is stopped
close_async ();
}
void nano::transport::tcp_channel::stop ()
@ -63,6 +74,7 @@ void nano::transport::tcp_channel::stop ()
debug_assert (!node.io_ctx.stopped ());
// Ensure that we are not trying to await the task while running on the same thread / io_context
debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ());
sending_task.cancel ();
sending_task.join ();
}
@ -117,7 +129,11 @@ asio::awaitable<void> nano::transport::tcp_channel::run_sending (nano::async::co
{
for (auto const & [type, item] : batch)
{
co_await send_one (type, item);
auto ec = co_await send_one (type, item);
if (ec)
{
co_return; // Stop on error
}
}
}
else
@ -127,20 +143,13 @@ asio::awaitable<void> nano::transport::tcp_channel::run_sending (nano::async::co
}
}
asio::awaitable<void> nano::transport::tcp_channel::send_one (traffic_type type, tcp_channel_queue::entry_t const & item)
asio::awaitable<boost::system::error_code> nano::transport::tcp_channel::send_one (traffic_type type, tcp_channel_queue::entry_t const & item)
{
debug_assert (strand.running_in_this_thread ());
auto const & [buffer, callback] = item;
auto const size = buffer.size ();
// Wait for socket
while (socket->full ())
{
node.stats.inc (nano::stat::type::tcp_channel_wait, nano::stat::detail::wait_socket, nano::stat::dir::out);
co_await nano::async::sleep_for (100ms); // TODO: Exponential backoff
}
// Wait for bandwidth
// This is somewhat inefficient
// The performance impact *should* be mitigated by the fact that we allocate it in larger chunks, so this happens relatively infrequently
@ -163,21 +172,26 @@ asio::awaitable<void> nano::transport::tcp_channel::send_one (traffic_type type,
node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::send, nano::stat::dir::out);
node.stats.inc (nano::stat::type::tcp_channel_send, to_stat_detail (type), nano::stat::dir::out);
socket->async_write (buffer, [this_w = weak_from_this (), callback, type] (boost::system::error_code const & ec, std::size_t size) {
if (auto this_l = this_w.lock ())
{
this_l->node.stats.inc (nano::stat::type::tcp_channel_ec, nano::to_stat_detail (ec), nano::stat::dir::out);
if (!ec)
{
this_l->node.stats.add (nano::stat::type::traffic_tcp_type, to_stat_detail (type), nano::stat::dir::out, size);
this_l->set_last_packet_sent (std::chrono::steady_clock::now ());
}
}
if (callback)
{
callback (ec, size);
}
});
auto [ec, size_written] = co_await socket->co_write (buffer, buffer.size ());
debug_assert (ec || size_written == size);
debug_assert (strand.running_in_this_thread ());
if (!ec)
{
node.stats.add (nano::stat::type::traffic_tcp_type, to_stat_detail (type), nano::stat::dir::out, size_written);
set_last_packet_sent (std::chrono::steady_clock::now ());
}
else
{
node.stats.inc (nano::stat::type::tcp_channel_ec, nano::to_stat_detail (ec), nano::stat::dir::out);
}
if (callback)
{
callback (ec, size_written);
}
co_return ec;
}
bool nano::transport::tcp_channel::alive () const

View file

@ -44,13 +44,12 @@ private:
class tcp_channel final : public nano::transport::channel, public std::enable_shared_from_this<tcp_channel>
{
friend class nano::transport::tcp_channels;
public:
tcp_channel (nano::node &, std::shared_ptr<nano::transport::tcp_socket>);
~tcp_channel () override;
void close () override;
void close_async (); // Safe to call from io context
bool max (nano::transport::traffic_type traffic_type) override;
bool alive () const override;
@ -74,7 +73,7 @@ private:
asio::awaitable<void> start_sending (nano::async::condition &);
asio::awaitable<void> run_sending (nano::async::condition &);
asio::awaitable<void> send_one (traffic_type, tcp_channel_queue::entry_t const &);
asio::awaitable<boost::system::error_code> send_one (traffic_type, tcp_channel_queue::entry_t const &);
public:
std::shared_ptr<nano::transport::tcp_socket> socket;
@ -90,8 +89,6 @@ private:
tcp_channel_queue queue;
std::atomic<size_t> allocated_bandwidth{ 0 };
std::atomic<bool> closed{ false };
public: // Logging
void operator() (nano::object_stream &) const override;
};

View file

@ -39,7 +39,7 @@ void nano::transport::tcp_channels::close ()
for (auto const & entry : channels)
{
entry.socket->close ();
entry.server->stop ();
entry.server->close ();
entry.channel->close ();
}
@ -86,10 +86,9 @@ bool nano::transport::tcp_channels::check (const nano::tcp_endpoint & endpoint,
return true; // OK
}
// This should be the only place in node where channels are created
std::shared_ptr<nano::transport::tcp_channel> nano::transport::tcp_channels::create (const std::shared_ptr<nano::transport::tcp_socket> & socket, const std::shared_ptr<nano::transport::tcp_server> & server, const nano::account & node_id)
{
auto const endpoint = socket->remote_endpoint ();
auto const endpoint = socket->get_remote_endpoint ();
debug_assert (endpoint.address ().is_v6 ());
nano::unique_lock<nano::mutex> lock{ mutex };
@ -110,10 +109,11 @@ std::shared_ptr<nano::transport::tcp_channel> nano::transport::tcp_channels::cre
node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::channel_accepted);
node.logger.debug (nano::log::type::tcp_channels, "Accepted channel: {} ({}) ({})",
socket->remote_endpoint (),
to_string (socket->endpoint_type ()),
socket->get_remote_endpoint (),
to_string (socket->get_endpoint_type ()),
node_id.to_node_id ());
// This should be the only place in node where channels are created
auto channel = std::make_shared<nano::transport::tcp_channel> (node, socket);
channel->set_node_id (node_id);
@ -400,9 +400,9 @@ std::optional<nano::keepalive> nano::transport::tcp_channels::sample_keepalive (
while (counter++ < channels.size ())
{
auto index = rng.random (channels.size ());
if (auto server = channels.get<random_access_tag> ()[index].server)
if (auto channel = channels.get<random_access_tag> ()[index].channel)
{
if (auto keepalive = server->pop_last_keepalive ())
if (auto keepalive = channel->pop_last_keepalive ())
{
return keepalive;
}

View file

@ -7,9 +7,11 @@ nano::error nano::transport::tcp_config::serialize (nano::tomlconfig & toml) con
toml.put ("max_attempts", max_attempts, "Maximum connection attempts. \ntype:uint64");
toml.put ("max_attempts_per_ip", max_attempts_per_ip, "Maximum connection attempts per IP. \ntype:uint64");
toml.put ("connect_timeout", connect_timeout.count (), "Timeout for establishing TCP connection in seconds. \ntype:uint64");
toml.put ("handshake_timeout", handshake_timeout.count (), "Timeout for completing handshake in seconds. \ntype:uint64");
toml.put ("io_timeout", io_timeout.count (), "Timeout for TCP I/O operations in seconds. \ntype:uint64");
toml.put ("connect_timeout", connect_timeout.count (), "Timeout for establishing TCP connection in seconds. \ntype:seconds");
toml.put ("handshake_timeout", handshake_timeout.count (), "Timeout for completing node handshake in seconds. \ntype:seconds");
toml.put ("io_timeout", io_timeout.count (), "Timeout for TCP I/O operations in seconds. Use 0 to disable timeout. \ntype:seconds");
toml.put ("silent_timeout", silent_timeout.count (), "Timeout for silent TCP connections in seconds. Use 0 to disable timeout. \ntype:seconds");
toml.put ("checkup_interval", checkup_interval.count (), "Interval for checking health of TCP connections in seconds. \ntype:seconds");
return toml.get_error ();
}
@ -24,6 +26,8 @@ nano::error nano::transport::tcp_config::deserialize (nano::tomlconfig & toml)
toml.get_duration ("connect_timeout", connect_timeout);
toml.get_duration ("handshake_timeout", handshake_timeout);
toml.get_duration ("io_timeout", io_timeout);
toml.get_duration ("silent_timeout", silent_timeout);
toml.get_duration ("checkup_interval", checkup_interval);
return toml.get_error ();
}

View file

@ -18,7 +18,8 @@ public:
max_outbound_connections = 128;
max_attempts = 128;
max_attempts_per_ip = 128;
connect_timeout = std::chrono::seconds{ 5 };
connect_timeout = 5s;
checkup_interval = 1s;
}
}
@ -31,8 +32,10 @@ public:
size_t max_outbound_connections{ 2048 };
size_t max_attempts{ 60 };
size_t max_attempts_per_ip{ 1 };
std::chrono::seconds connect_timeout{ 60 };
std::chrono::seconds connect_timeout{ 30 };
std::chrono::seconds handshake_timeout{ 30 };
std::chrono::seconds io_timeout{ 30 };
std::chrono::seconds silent_timeout{ 30 };
std::chrono::seconds checkup_interval{ 5 };
};
}

View file

@ -154,7 +154,7 @@ void nano::transport::tcp_listener::stop ()
for (auto & connection : connections_l)
{
connection.socket->close ();
connection.server->stop ();
connection.server->close ();
}
logger.debug (nano::log::type::tcp_listener, "Stopped");
@ -199,7 +199,7 @@ void nano::transport::tcp_listener::purge (nano::unique_lock<nano::mutex> & lock
logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint);
connection.socket->close ();
connection.server->stop ();
connection.server->close ();
}
}
@ -235,7 +235,7 @@ void nano::transport::tcp_listener::timeout ()
connection.endpoint,
nano::log::seconds_delta (connection.socket->get_time_connected ()));
connection.socket->close (); // Schedule socket close, this is non-blocking, safe to call under lock
connection.socket->close_async (); // Schedule socket close, this is non-blocking, safe to call under lock
}
}
}
@ -306,9 +306,6 @@ auto nano::transport::tcp_listener::connect_impl (asio::ip::tcp::endpoint endpoi
{
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_success, nano::stat::dir::out);
logger.debug (nano::log::type::tcp_listener, "Successfully connected to: {}", endpoint);
release_assert (result.server);
result.server->initiate_handshake ();
}
else
{
@ -431,17 +428,14 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket
stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, to_stat_dir (type));
logger.debug (nano::log::type::tcp_listener, "Accepted connection: {} ({})", remote_endpoint, to_string (type));
auto socket = std::make_shared<nano::transport::tcp_socket> (node, std::move (raw_socket), remote_endpoint, local_endpoint, to_socket_endpoint (type));
auto server = std::make_shared<nano::transport::tcp_server> (socket, node.shared (), true);
auto socket = std::make_shared<nano::transport::tcp_socket> (node, std::move (raw_socket), to_socket_endpoint (type));
auto server = std::make_shared<nano::transport::tcp_server> (node, socket);
server->start ();
connections.emplace_back (connection{ type, remote_endpoint, socket, server });
lock.unlock ();
socket->set_timeout (node.network_params.network.idle_timeout);
socket->start ();
server->start ();
connection_accepted.notify (socket, server);
return { accept_result::accepted, socket, server };

View file

@ -25,9 +25,6 @@ namespace asio = boost::asio;
namespace nano::transport
{
/**
* Server side portion of tcp sessions. Listens for new socket connections and spawns tcp_server objects when connected.
*/
class tcp_listener final
{
public:

View file

@ -6,433 +6,406 @@
#include <memory>
/*
* tcp_server
*/
nano::transport::tcp_server::tcp_server (std::shared_ptr<nano::transport::tcp_socket> socket_a, std::shared_ptr<nano::node> node_a, bool allow_bootstrap_a) :
socket{ socket_a },
nano::transport::tcp_server::tcp_server (nano::node & node_a, std::shared_ptr<nano::transport::tcp_socket> socket_a) :
node{ node_a },
allow_bootstrap{ allow_bootstrap_a },
message_deserializer{
std::make_shared<nano::transport::message_deserializer> (node_a->network_params.network, node_a->network.filter, node_a->block_uniquer, node_a->vote_uniquer,
[socket_l = socket] (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
debug_assert (socket_l != nullptr);
socket_l->read_impl (data_a, size_a, callback_a);
})
}
socket{ socket_a },
strand{ node_a.io_ctx.get_executor () },
task{ strand },
buffer{ std::make_shared<nano::shared_buffer::element_type> (max_buffer_size) }
{
debug_assert (socket != nullptr);
}
nano::transport::tcp_server::~tcp_server ()
{
auto node = this->node.lock ();
if (!node)
{
return;
}
node->logger.debug (nano::log::type::tcp_server, "Exiting server: {}", remote_endpoint);
stop ();
close ();
}
void nano::transport::tcp_server::close ()
{
stop ();
socket->close ();
}
void nano::transport::tcp_server::close_async ()
{
socket->close_async ();
}
// Starting the server must be separate from the constructor to allow the socket to access shared_from_this
void nano::transport::tcp_server::start ()
{
// Set remote_endpoint
if (remote_endpoint.port () == 0)
{
remote_endpoint = socket->remote_endpoint ();
debug_assert (remote_endpoint.port () != 0);
}
auto node = this->node.lock ();
if (!node)
{
return;
}
node->logger.debug (nano::log::type::tcp_server, "Starting server: {}", remote_endpoint);
receive_message ();
task = nano::async::task (strand, start_impl ());
}
void nano::transport::tcp_server::stop ()
{
if (!stopped.exchange (true))
if (task.running ())
{
socket->close ();
// Node context must be running to gracefully stop async tasks
debug_assert (!node.io_ctx.stopped ());
// Ensure that we are not trying to await the task while running on the same thread / io_context
debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ());
task.cancel ();
task.join ();
}
}
void nano::transport::tcp_server::receive_message ()
auto nano::transport::tcp_server::start_impl () -> asio::awaitable<void>
{
if (stopped)
debug_assert (strand.running_in_this_thread ());
try
{
return;
}
message_deserializer->read ([this_l = shared_from_this ()] (boost::system::error_code ec, std::unique_ptr<nano::message> message) {
auto node = this_l->node.lock ();
if (!node)
auto handshake_result = co_await perform_handshake ();
if (handshake_result != process_result::progress)
{
return;
}
if (ec)
{
// IO error or critical error when deserializing message
node->stats.inc (nano::stat::type::error, to_stat_detail (this_l->message_deserializer->status));
node->logger.debug (nano::log::type::tcp_server, "Error reading message: {}, status: {} ({})",
ec.message (),
to_string (this_l->message_deserializer->status),
this_l->remote_endpoint);
this_l->stop ();
node.logger.debug (nano::log::type::tcp_server, "Handshake aborted: {}", get_remote_endpoint ());
}
else
{
this_l->received_message (std::move (message));
co_await run_realtime ();
}
});
}
catch (boost::system::system_error const & ex)
{
node.stats.inc (nano::stat::type::tcp_server_error, nano::to_stat_detail (ex.code ()), nano::stat::dir::in);
node.logger.debug (nano::log::type::tcp_server, "Server stopped due to error: {} ({})", ex.code (), get_remote_endpoint ());
}
catch (...)
{
release_assert (false, "unexpected exception");
}
debug_assert (strand.running_in_this_thread ());
// Ensure socket gets closed if task is stopped
close_async ();
}
void nano::transport::tcp_server::received_message (std::unique_ptr<nano::message> message)
bool nano::transport::tcp_server::alive () const
{
auto node = this->node.lock ();
if (!node)
{
return;
}
process_result result = process_result::progress;
if (message)
{
result = process_message (std::move (message));
}
else
{
// Error while deserializing message
debug_assert (message_deserializer->status != transport::parse_status::success);
node->stats.inc (nano::stat::type::error, to_stat_detail (message_deserializer->status));
switch (message_deserializer->status)
{
// Avoid too much noise about `duplicate_publish_message` errors
case nano::transport::parse_status::duplicate_publish_message:
{
node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message);
}
break;
case nano::transport::parse_status::duplicate_confirm_ack_message:
{
node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message);
}
break;
default:
{
node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})",
to_string (message_deserializer->status),
remote_endpoint);
}
break;
}
}
switch (result)
{
case process_result::progress:
{
receive_message ();
}
break;
case process_result::abort:
{
stop ();
}
break;
case process_result::pause:
{
// Do nothing
}
break;
}
return socket->alive ();
}
auto nano::transport::tcp_server::process_message (std::unique_ptr<nano::message> message) -> process_result
auto nano::transport::tcp_server::perform_handshake () -> asio::awaitable<process_result>
{
auto node = this->node.lock ();
if (!node)
debug_assert (strand.running_in_this_thread ());
debug_assert (get_type () == nano::transport::socket_type::undefined);
// Initiate handshake if we are the ones initiating the connection
if (socket->get_endpoint_type () == nano::transport::socket_endpoint::client)
{
return process_result::abort;
co_await send_handshake_request ();
}
node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->type ()), nano::stat::dir::in);
debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ());
/*
* Server initially starts in undefined state, where it waits for either a handshake or booststrap request message
* If the server receives a handshake (and it is successfully validated) it will switch to a realtime mode.
* In realtime mode messages are deserialized and queued to `tcp_message_manager` for further processing.
* In realtime mode any bootstrap requests are ignored.
*
* If the server receives a bootstrap request before receiving a handshake, it will switch to a bootstrap mode.
* In bootstrap mode once a valid bootstrap request message is received, the server will start a corresponding bootstrap server and pass control to that server.
* Once that server finishes its task, control is passed back to this server to read and process any subsequent messages.
* In bootstrap mode any realtime messages are ignored
*/
if (is_undefined_connection ())
struct handshake_message_visitor : public nano::message_visitor
{
handshake_message_visitor handshake_visitor{ *this };
bool process{ false };
std::optional<nano::node_id_handshake> handshake;
void node_id_handshake (nano::node_id_handshake const & msg) override
{
process = true;
handshake = msg;
}
};
// Two-step handshake
for (int i = 0; i < 2; ++i)
{
auto [message, message_status] = co_await receive_message ();
if (!message)
{
node.logger.debug (nano::log::type::tcp_server, "Error deserializing handshake message: {} ({})",
to_string (message_status),
get_remote_endpoint ());
}
handshake_message_visitor handshake_visitor{};
message->visit (handshake_visitor);
switch (handshake_visitor.result)
handshake_status status = handshake_status::abort;
if (handshake_visitor.process)
{
release_assert (handshake_visitor.handshake.has_value ());
status = co_await process_handshake (*handshake_visitor.handshake);
}
switch (status)
{
case handshake_status::abort:
case handshake_status::bootstrap: // Legacy bootstrap is no longer supported
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort);
node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), remote_endpoint);
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort);
node.logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})",
to_string (message->type ()),
get_remote_endpoint ());
return process_result::abort;
}
case handshake_status::handshake:
{
return process_result::progress; // Continue handshake
co_return process_result::abort;
}
case handshake_status::realtime:
{
queue_realtime (std::move (message));
return process_result::progress; // Continue receiving new messages
co_return process_result::progress; // Continue receiving new messages
}
case handshake_status::bootstrap:
case handshake_status::handshake:
{
bool success = to_bootstrap_connection ();
if (!success)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), remote_endpoint);
return process_result::abort; // Switch failed, abort
}
else
{
// Fall through to process the bootstrap message
}
// Continue handshake
}
}
}
else if (is_realtime_connection ())
{
realtime_message_visitor realtime_visitor{ *this };
message->visit (realtime_visitor);
if (realtime_visitor.process)
// Failed to complete handshake, abort
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_failed);
node.logger.debug (nano::log::type::tcp_server, "Failed to complete handshake ({})", get_remote_endpoint ());
co_return process_result::abort;
}
auto nano::transport::tcp_server::run_realtime () -> asio::awaitable<void>
{
debug_assert (strand.running_in_this_thread ());
debug_assert (get_type () == nano::transport::socket_type::realtime);
node.logger.debug (nano::log::type::tcp_server, "Running realtime connection: {}", get_remote_endpoint ());
while (!co_await nano::async::cancelled ())
{
debug_assert (strand.running_in_this_thread ());
auto [message, status] = co_await receive_message ();
if (message)
{
queue_realtime (std::move (message));
realtime_message_visitor realtime_visitor{};
message->visit (realtime_visitor);
if (realtime_visitor.process)
{
release_assert (channel != nullptr);
channel->set_last_packet_received (std::chrono::steady_clock::now ());
// TODO: Throttle if not added
bool added = node.message_processor.put (std::move (message), channel);
node.stats.inc (nano::stat::type::tcp_server, added ? nano::stat::detail::message_queued : nano::stat::detail::message_dropped);
}
else
{
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::message_ignored);
}
}
else // Error while deserializing message
{
debug_assert (status != nano::deserialize_message_status::success);
return process_result::progress;
}
// The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if`
if (is_bootstrap_connection ())
{
return process_result::abort;
}
node.stats.inc (nano::stat::type::tcp_server_error, to_stat_detail (status));
debug_assert (false);
return process_result::abort;
switch (status)
{
// Avoid too much noise about `duplicate_publish_message` errors
case nano::deserialize_message_status::duplicate_publish_message:
{
node.stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message);
}
break;
case nano::deserialize_message_status::duplicate_confirm_ack_message:
{
node.stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message);
}
break;
default:
{
node.logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})",
to_string (status),
get_remote_endpoint ());
co_return; // Stop receiving further messages
}
break;
}
}
}
}
void nano::transport::tcp_server::queue_realtime (std::unique_ptr<nano::message> message)
auto nano::transport::tcp_server::receive_message () -> asio::awaitable<nano::deserialize_message_result>
{
auto node = this->node.lock ();
if (!node)
debug_assert (strand.running_in_this_thread ());
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::read_header, nano::stat::dir::in);
node.stats.inc (nano::stat::type::tcp_server_read, nano::stat::detail::header, nano::stat::dir::in);
auto header_payload = co_await read_socket (nano::message_header::size);
auto header_stream = nano::bufferstream{ header_payload.data (), header_payload.size () };
bool error = false;
nano::message_header header{ error, header_stream };
if (error)
{
return;
co_return nano::deserialize_message_result{ nullptr, nano::deserialize_message_status::invalid_header };
}
if (!header.is_valid_message_type ())
{
co_return nano::deserialize_message_result{ nullptr, nano::deserialize_message_status::invalid_message_type };
}
if (header.network != node.config.network_params.network.current_network)
{
co_return nano::deserialize_message_result{ nullptr, nano::deserialize_message_status::invalid_network };
}
if (header.version_using < node.config.network_params.network.protocol_version_min)
{
co_return nano::deserialize_message_result{ nullptr, nano::deserialize_message_status::outdated_version };
}
release_assert (channel != nullptr);
auto const payload_size = header.payload_length_bytes ();
channel->set_last_packet_received (std::chrono::steady_clock::now ());
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::read_payload, nano::stat::dir::in);
node.stats.inc (nano::stat::type::tcp_server_read, to_stat_detail (header.type), nano::stat::dir::in);
bool added = node->message_processor.put (std::move (message), channel);
// TODO: Throttle if not added
auto payload_buffer = payload_size > 0 ? co_await read_socket (payload_size) : nano::buffer_view{ buffer->data (), 0 };
auto result = nano::deserialize_message (payload_buffer, header,
node.network_params.network,
&node.network.filter,
&node.block_uniquer,
&node.vote_uniquer);
auto const & [message, status] = result;
if (message)
{
node.stats.inc (nano::stat::type::tcp_server_message, to_stat_detail (message->type ()), nano::stat::dir::in);
}
co_return result;
}
auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status
auto nano::transport::tcp_server::read_socket (size_t size) const -> asio::awaitable<nano::buffer_view>
{
auto node = this->node.lock ();
if (!node)
debug_assert (strand.running_in_this_thread ());
auto [ec, size_read] = co_await socket->co_read (buffer, size);
debug_assert (ec || size_read == size);
debug_assert (strand.running_in_this_thread ());
if (ec)
{
return handshake_status::abort;
throw boost::system::system_error (ec);
}
if (node->flags.disable_tcp_realtime)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime mode ({})", remote_endpoint);
release_assert (size_read == size);
co_return nano::buffer_view{ buffer->data (), size_read };
}
return handshake_status::abort;
auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> asio::awaitable<handshake_status>
{
if (node.flags.disable_tcp_realtime)
{
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node.logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime mode ({})", get_remote_endpoint ());
co_return handshake_status::abort;
}
if (!message.query && !message.response)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", remote_endpoint);
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node.logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", get_remote_endpoint ());
return handshake_status::abort;
co_return handshake_status::abort;
}
if (message.query && handshake_received) // Second handshake message should be a response only
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", remote_endpoint);
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node.logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", get_remote_endpoint ());
return handshake_status::abort;
co_return handshake_status::abort;
}
handshake_received = true;
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
node->logger.debug (nano::log::type::tcp_server, "Handshake message received: {} ({})",
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
node.logger.debug (nano::log::type::tcp_server, "Handshake message received: {} ({})",
message.query ? (message.response ? "query + response" : "query") : (message.response ? "response" : "none"),
remote_endpoint);
get_remote_endpoint ());
if (message.query)
{
// Sends response + our own query
send_handshake_response (*message.query, message.is_v2 ());
co_await send_handshake_response (*message.query, message.is_v2 ());
// Fall through and continue handshake
}
if (message.response)
{
if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (remote_endpoint)))
if (node.network.verify_handshake_response (*message.response, get_remote_endpoint ()))
{
bool success = to_realtime_connection (message.response->node_id);
if (success)
{
return handshake_status::realtime; // Switched to realtime
co_return handshake_status::realtime; // Switched to realtime
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", remote_endpoint);
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error);
node.logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", get_remote_endpoint ());
return handshake_status::abort;
co_return handshake_status::abort;
}
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid);
node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", remote_endpoint);
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid);
node.logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", get_remote_endpoint ());
return handshake_status::abort;
co_return handshake_status::abort;
}
}
return handshake_status::handshake; // Handshake is in progress
co_return handshake_status::handshake; // Handshake is in progress
}
void nano::transport::tcp_server::initiate_handshake ()
auto nano::transport::tcp_server::send_handshake_request () -> asio::awaitable<void>
{
auto node = this->node.lock ();
if (!node)
{
return;
}
auto query = node.network.prepare_handshake_query (get_remote_endpoint ());
nano::node_id_handshake message{ node.network_params.network, query };
auto query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake message{ node->network_params.network, query };
node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", remote_endpoint);
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_initiate, nano::stat::dir::out);
node.logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", get_remote_endpoint ());
auto shared_const_buffer = message.to_shared_const_buffer ();
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
auto node = this_l->node.lock ();
if (!node)
{
return;
}
if (ec)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error);
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})", ec.message (), this_l->remote_endpoint);
// Stop invalid handshake
this_l->stop ();
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_initiate, nano::stat::dir::out);
}
});
auto [ec, size] = co_await socket->co_write (shared_const_buffer, shared_const_buffer.size ());
debug_assert (ec || size == shared_const_buffer.size ());
if (ec)
{
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error);
node.logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})", ec.message (), get_remote_endpoint ());
throw boost::system::system_error (ec); // Abort further processing
}
else
{
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out);
}
}
void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2)
auto nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) -> asio::awaitable<void>
{
auto node = this->node.lock ();
if (!node)
{
return;
}
auto response = node.network.prepare_handshake_response (query, v2);
auto own_query = node.network.prepare_handshake_query (get_remote_endpoint ());
nano::node_id_handshake handshake_response{ node.network_params.network, own_query, response };
auto response = node->network.prepare_handshake_response (query, v2);
auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint));
nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response };
node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", remote_endpoint);
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out);
node.logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", get_remote_endpoint ());
auto shared_const_buffer = handshake_response.to_shared_const_buffer ();
socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) {
auto node = this_l->node.lock ();
if (!node)
{
return;
}
if (ec)
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error);
node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), this_l->remote_endpoint);
// Stop invalid handshake
this_l->stop ();
}
else
{
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out);
node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out);
}
});
}
auto [ec, size] = co_await socket->co_write (shared_const_buffer, shared_const_buffer.size ());
debug_assert (ec || size == shared_const_buffer.size ());
if (ec)
{
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error);
node.logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), get_remote_endpoint ());
/*
* handshake_message_visitor
*/
void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (const nano::node_id_handshake & message)
{
result = server.process_handshake (message);
}
void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message)
{
result = handshake_status::bootstrap;
}
void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message)
{
result = handshake_status::bootstrap;
}
void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message)
{
result = handshake_status::bootstrap;
}
void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message)
{
result = handshake_status::bootstrap;
throw boost::system::system_error (ec); // Abort further processing
}
else
{
node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out);
}
}
/*
@ -442,7 +415,6 @@ void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const
void nano::transport::tcp_server::realtime_message_visitor::keepalive (const nano::keepalive & message)
{
process = true;
server.set_last_keepalive (message);
}
void nano::transport::tcp_server::realtime_message_visitor::publish (const nano::publish & message)
@ -467,21 +439,7 @@ void nano::transport::tcp_server::realtime_message_visitor::frontier_req (const
void nano::transport::tcp_server::realtime_message_visitor::telemetry_req (const nano::telemetry_req & message)
{
auto node = server.node.lock ();
if (!node)
{
return;
}
// Only handle telemetry requests if they are outside the cooldown period
if (server.last_telemetry_req + node->network_params.network.telemetry_request_cooldown < std::chrono::steady_clock::now ())
{
server.last_telemetry_req = std::chrono::steady_clock::now ();
process = true;
}
else
{
node->stats.inc (nano::stat::type::telemetry, nano::stat::detail::request_within_protection_cache_zone);
}
process = true;
}
void nano::transport::tcp_server::realtime_message_visitor::telemetry_ack (const nano::telemetry_ack & message)
@ -503,56 +461,13 @@ void nano::transport::tcp_server::realtime_message_visitor::asc_pull_ack (const
*
*/
// TODO: We could periodically call this (from a dedicated timeout thread for eg.) but socket already handles timeouts,
// and since we only ever store tcp_server as weak_ptr, socket timeout will automatically trigger tcp_server cleanup
void nano::transport::tcp_server::timeout ()
{
auto node = this->node.lock ();
if (!node)
{
return;
}
if (socket->has_timed_out ())
{
node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", remote_endpoint);
socket->close ();
}
}
void nano::transport::tcp_server::set_last_keepalive (nano::keepalive const & message)
{
std::lock_guard<nano::mutex> lock{ mutex };
if (!last_keepalive)
{
last_keepalive = message;
}
}
std::optional<nano::keepalive> nano::transport::tcp_server::pop_last_keepalive ()
{
std::lock_guard<nano::mutex> lock{ mutex };
auto result = last_keepalive;
last_keepalive = std::nullopt;
return result;
}
bool nano::transport::tcp_server::to_bootstrap_connection ()
{
auto node = this->node.lock ();
if (!node)
if (node.flags.disable_bootstrap_listener)
{
return false;
}
if (!allow_bootstrap)
{
return false;
}
if (node->flags.disable_bootstrap_listener)
{
return false;
}
if (node->tcp_listener.bootstrap_count () >= node->config.bootstrap_connections_max)
if (node.tcp_listener.bootstrap_count () >= node.config.bootstrap_connections_max)
{
return false;
}
@ -563,19 +478,14 @@ bool nano::transport::tcp_server::to_bootstrap_connection ()
socket->type_set (nano::transport::socket_type::bootstrap);
node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", remote_endpoint);
node.logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", get_remote_endpoint ());
return true;
}
bool nano::transport::tcp_server::to_realtime_connection (nano::account const & node_id)
{
auto node = this->node.lock ();
if (!node)
{
return false;
}
if (node->flags.disable_tcp_realtime)
if (node.flags.disable_tcp_realtime)
{
return false;
}
@ -584,7 +494,7 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const &
return false;
}
auto channel_l = node->network.tcp_channels.create (socket, shared_from_this (), node_id);
auto channel_l = node.network.tcp_channels.create (socket, shared_from_this (), node_id);
if (!channel_l)
{
return false;
@ -593,22 +503,7 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const &
socket->type_set (nano::transport::socket_type::realtime);
node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", remote_endpoint);
node.logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", get_remote_endpoint ());
return true;
}
bool nano::transport::tcp_server::is_undefined_connection () const
{
return socket->type () == nano::transport::socket_type::undefined;
}
bool nano::transport::tcp_server::is_bootstrap_connection () const
{
return socket->is_bootstrap_connection ();
}
bool nano::transport::tcp_server::is_realtime_connection () const
{
return socket->is_realtime_connection ();
}

View file

@ -1,63 +1,57 @@
#pragma once
#include <nano/lib/stream.hpp>
#include <nano/node/endpoint.hpp>
#include <nano/node/fwd.hpp>
#include <nano/node/messages.hpp>
#include <nano/node/transport/fwd.hpp>
#include <nano/node/transport/tcp_socket.hpp>
#include <atomic>
namespace nano
{
class message;
}
namespace nano::transport
{
class message_deserializer;
class tcp_server;
class tcp_server final : public std::enable_shared_from_this<tcp_server>
{
public:
tcp_server (std::shared_ptr<nano::transport::tcp_socket>, std::shared_ptr<nano::node>, bool allow_bootstrap = true);
tcp_server (nano::node &, std::shared_ptr<nano::transport::tcp_socket>);
~tcp_server ();
void start ();
void stop ();
void initiate_handshake ();
void timeout ();
void set_last_keepalive (nano::keepalive const & message);
std::optional<nano::keepalive> pop_last_keepalive ();
void close ();
void close_async (); // Safe to call from io context
std::shared_ptr<nano::transport::tcp_socket> const socket;
std::weak_ptr<nano::node> const node;
nano::mutex mutex;
std::atomic<bool> stopped{ false };
std::atomic<bool> handshake_received{ false };
// Remote endpoint used to remove response channel even after socket closing
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
std::chrono::steady_clock::time_point last_telemetry_req{};
bool alive () const;
public:
nano::endpoint get_remote_endpoint () const
{
return socket->get_remote_endpoint ();
}
nano::endpoint get_local_endpoint () const
{
return socket->get_local_endpoint ();
}
nano::transport::socket_type get_type () const
{
return socket->type ();
}
private:
void stop ();
enum class process_result
{
abort,
progress,
pause,
};
void receive_message ();
void received_message (std::unique_ptr<nano::message> message);
process_result process_message (std::unique_ptr<nano::message> message);
void queue_realtime (std::unique_ptr<nano::message> message);
bool to_bootstrap_connection ();
bool to_realtime_connection (nano::account const & node_id);
bool is_undefined_connection () const;
bool is_bootstrap_connection () const;
bool is_realtime_connection () const;
asio::awaitable<void> start_impl ();
asio::awaitable<process_result> perform_handshake ();
asio::awaitable<void> run_realtime ();
asio::awaitable<nano::deserialize_message_result> receive_message ();
asio::awaitable<nano::buffer_view> read_socket (size_t size) const;
enum class handshake_status
{
@ -67,44 +61,34 @@ private:
bootstrap,
};
handshake_status process_handshake (nano::node_id_handshake const & message);
void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2);
asio::awaitable<handshake_status> process_handshake (nano::node_id_handshake const & message);
asio::awaitable<void> send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2);
asio::awaitable<void> send_handshake_request ();
private:
bool const allow_bootstrap;
std::shared_ptr<nano::transport::message_deserializer> message_deserializer;
std::optional<nano::keepalive> last_keepalive;
nano::node & node;
// Every realtime connection must have an associated channel
std::shared_ptr<nano::transport::tcp_channel> channel;
std::shared_ptr<nano::transport::tcp_socket> socket;
std::shared_ptr<nano::transport::tcp_channel> channel; // Every realtime connection must have an associated channel
nano::async::strand strand;
nano::async::task task;
nano::shared_buffer buffer;
static size_t constexpr max_buffer_size = 64 * 1024; // 64 KB
std::atomic<bool> handshake_received{ false };
private:
bool to_bootstrap_connection ();
bool to_realtime_connection (nano::account const & node_id);
private: // Visitors
class handshake_message_visitor : public nano::message_visitor
{
public:
handshake_status result{ handshake_status::abort };
explicit handshake_message_visitor (tcp_server & server) :
server{ server } {};
void node_id_handshake (nano::node_id_handshake const &) override;
void bulk_pull (nano::bulk_pull const &) override;
void bulk_pull_account (nano::bulk_pull_account const &) override;
void bulk_push (nano::bulk_push const &) override;
void frontier_req (nano::frontier_req const &) override;
private:
tcp_server & server;
};
class realtime_message_visitor : public nano::message_visitor
{
public:
bool process{ false };
explicit realtime_message_visitor (tcp_server & server) :
server{ server } {};
void keepalive (nano::keepalive const &) override;
void publish (nano::publish const &) override;
void confirm_req (nano::confirm_req const &) override;
@ -114,11 +98,6 @@ private: // Visitors
void telemetry_ack (nano::telemetry_ack const &) override;
void asc_pull_req (nano::asc_pull_req const &) override;
void asc_pull_ack (nano::asc_pull_ack const &) override;
private:
tcp_server & server;
};
friend class handshake_message_visitor;
};
}

View file

@ -12,484 +12,438 @@
#include <memory>
#include <utility>
/*
* socket
*/
nano::transport::tcp_socket::tcp_socket (nano::node & node_a, nano::transport::socket_endpoint endpoint_type_a, size_t queue_size_a) :
tcp_socket{ node_a, boost::asio::ip::tcp::socket{ node_a.io_ctx }, {}, {}, endpoint_type_a, queue_size_a }
nano::transport::tcp_socket::tcp_socket (nano::node & node_a, nano::transport::socket_endpoint endpoint_type_a) :
node{ node_a },
strand{ node_a.io_ctx.get_executor () },
task{ strand },
raw_socket{ node_a.io_ctx },
endpoint_type{ endpoint_type_a }
{
start ();
}
nano::transport::tcp_socket::tcp_socket (nano::node & node_a, boost::asio::ip::tcp::socket raw_socket_a, boost::asio::ip::tcp::endpoint remote_endpoint_a, boost::asio::ip::tcp::endpoint local_endpoint_a, nano::transport::socket_endpoint endpoint_type_a, size_t queue_size_a) :
queue_size{ queue_size_a },
send_queue{ queue_size },
node_w{ node_a.shared () },
nano::transport::tcp_socket::tcp_socket (nano::node & node_a, asio::ip::tcp::socket raw_socket_a, nano::transport::socket_endpoint endpoint_type_a) :
node{ node_a },
strand{ node_a.io_ctx.get_executor () },
task{ strand },
raw_socket{ std::move (raw_socket_a) },
remote{ remote_endpoint_a },
local{ local_endpoint_a },
endpoint_type_m{ endpoint_type_a },
timeout{ std::numeric_limits<uint64_t>::max () },
last_completion_time_or_init{ nano::seconds_since_epoch () },
last_receive_time_or_init{ nano::seconds_since_epoch () },
default_timeout{ node_a.config.tcp_io_timeout },
silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time }
local_endpoint{ raw_socket.local_endpoint () },
remote_endpoint{ raw_socket.remote_endpoint () },
endpoint_type{ endpoint_type_a },
connected{ true },
time_connected{ std::chrono::steady_clock::now () }
{
time_connected = std::chrono::steady_clock::now ();
start ();
}
nano::transport::tcp_socket::~tcp_socket ()
{
close_internal ();
closed = true;
close ();
}
void nano::transport::tcp_socket::close ()
{
stop ();
if (closed) // Avoid closing the socket multiple times
{
return;
}
// Node context must be running to gracefully stop async tasks
debug_assert (!node.io_ctx.stopped ());
// Ensure that we are not trying to await the task while running on the same thread / io_context
debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ());
// Dispatch close raw socket to the strand, wait synchronously for the operation to complete
auto fut = asio::dispatch (strand, asio::use_future ([this] () {
close_impl ();
}));
fut.wait (); // Blocking call
}
void nano::transport::tcp_socket::close_async ()
{
// Node context must be running to gracefully stop async tasks
debug_assert (!node.io_ctx.stopped ());
asio::dispatch (strand, [this, /* lifetime guard */ this_s = shared_from_this ()] () {
close_impl ();
});
}
void nano::transport::tcp_socket::close_impl ()
{
debug_assert (strand.running_in_this_thread ());
if (closed.exchange (true)) // Avoid closing the socket multiple times
{
return;
}
boost::system::error_code ec;
raw_socket.shutdown (asio::ip::tcp::socket::shutdown_both, ec); // Best effort, ignore errors
raw_socket.close (ec); // Best effort, ignore errors
if (!ec)
{
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::close);
node.logger.debug (nano::log::type::tcp_socket, "Closed socket: {}", remote_endpoint);
}
else
{
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::close_error);
node.logger.debug (nano::log::type::tcp_socket, "Closed socket, ungracefully: {} ({})", remote_endpoint, ec.message ());
}
}
void nano::transport::tcp_socket::start ()
{
ongoing_checkup ();
release_assert (!task.joinable ());
task = nano::async::task (strand, ongoing_checkup ());
}
void nano::transport::tcp_socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function<void (boost::system::error_code const &)> callback_a)
void nano::transport::tcp_socket::stop ()
{
debug_assert (callback_a);
debug_assert (endpoint_type () == socket_endpoint::client);
if (task.running ())
{
// Node context must be running to gracefully stop async tasks
debug_assert (!node.io_ctx.stopped ());
// Ensure that we are not trying to await the task while running on the same thread / io_context
debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ());
start ();
set_default_timeout ();
task.cancel ();
task.join ();
}
}
boost::asio::post (strand, [this_l = shared_from_this (), endpoint_a, callback = std::move (callback_a)] () {
this_l->raw_socket.async_connect (endpoint_a,
boost::asio::bind_executor (this_l->strand, [this_l, callback = std::move (callback), endpoint_a] (boost::system::error_code const & ec) {
debug_assert (this_l->strand.running_in_this_thread ());
auto node_l = this_l->node_w.lock ();
if (!node_l)
auto nano::transport::tcp_socket::ongoing_checkup () -> asio::awaitable<void>
{
debug_assert (strand.running_in_this_thread ());
try
{
while (!co_await nano::async::cancelled () && alive ())
{
bool healthy = checkup ();
if (!healthy)
{
return;
}
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::unhealthy);
node.logger.debug (nano::log::type::tcp_socket, "Unhealthy socket detected: {} (timed out: {})",
remote_endpoint,
timed_out.load ());
this_l->remote = endpoint_a;
close_impl ();
if (ec)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error, nano::stat::dir::in);
this_l->close ();
break; // Stop the checkup task
}
else
{
this_l->time_connected = std::chrono::steady_clock::now ();
this_l->set_last_completion ();
{
// Best effort attempt to get endpoint address
boost::system::error_code ec;
this_l->local = this_l->raw_socket.local_endpoint (ec);
}
node_l->logger.debug (nano::log::type::tcp_socket, "Successfully connected to: {}, local: {}", this_l->remote, this_l->local);
std::chrono::seconds sleep_duration = node.config.tcp.checkup_interval;
co_await nano::async::sleep_for (sleep_duration);
timestamp += sleep_duration.count ();
}
callback (ec);
}));
});
}
}
catch (boost::system::system_error const & ex)
{
// Operation aborted is expected when cancelling the acceptor
debug_assert (ex.code () == asio::error::operation_aborted);
}
debug_assert (strand.running_in_this_thread ());
// Close the socket if checkup task is canceled for any reason
close_impl ();
}
void nano::transport::tcp_socket::async_read (std::shared_ptr<std::vector<uint8_t>> const & buffer_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
{
debug_assert (callback_a);
if (size_a <= buffer_a->size ())
{
if (!closed)
{
set_default_timeout ();
boost::asio::post (strand, [this_l = shared_from_this (), buffer_a, callback = std::move (callback_a), size_a] () mutable {
boost::asio::async_read (this_l->raw_socket, boost::asio::buffer (buffer_a->data (), size_a),
boost::asio::bind_executor (this_l->strand,
[this_l, buffer_a, cbk = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
debug_assert (this_l->strand.running_in_this_thread ());
auto node_l = this_l->node_w.lock ();
if (!node_l)
{
return;
}
if (ec)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in);
this_l->close ();
}
else
{
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::in, size_a);
this_l->set_last_completion ();
this_l->set_last_receive_time ();
}
cbk (ec, size_a);
}));
});
}
}
else
{
debug_assert (false && "nano::transport::tcp_socket::async_read called with incorrect buffer size");
boost::system::error_code ec_buffer = boost::system::errc::make_error_code (boost::system::errc::no_buffer_space);
callback_a (ec_buffer, 0);
}
}
void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
{
auto node_l = node_w.lock ();
if (!node_l)
{
return;
}
if (closed)
{
if (callback_a)
{
node_l->io_ctx.post ([callback = std::move (callback_a)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
return;
}
bool queued = send_queue.insert (buffer_a, callback_a, traffic_type::generic);
if (!queued)
{
if (callback_a)
{
node_l->io_ctx.post ([callback = std::move (callback_a)] () {
callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
});
}
return;
}
boost::asio::post (strand, [this_s = shared_from_this (), buffer_a, callback_a] () {
if (!this_s->write_in_progress)
{
this_s->write_queued_messages ();
}
});
}
// Must be called from strand
void nano::transport::tcp_socket::write_queued_messages ()
bool nano::transport::tcp_socket::checkup ()
{
debug_assert (strand.running_in_this_thread ());
if (closed)
if (connected)
{
return;
}
auto maybe_next = send_queue.pop ();
if (!maybe_next)
{
return;
}
auto const & [next, type] = *maybe_next;
set_default_timeout ();
write_in_progress = true;
nano::async_write (raw_socket, next.buffer,
boost::asio::bind_executor (strand, [this_l = shared_from_this (), next /* `next` object keeps buffer in scope */, type] (boost::system::error_code ec, std::size_t size) {
debug_assert (this_l->strand.running_in_this_thread ());
auto node_l = this_l->node_w.lock ();
if (!node_l)
if (!raw_socket.is_open ())
{
return;
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::already_closed);
return false; // Bad
}
this_l->write_in_progress = false;
if (ec)
debug_assert (timestamp >= read_timestamp);
debug_assert (timestamp >= write_timestamp);
debug_assert (timestamp >= last_receive);
debug_assert (timestamp >= last_send);
std::chrono::seconds const io_threshold = node.config.tcp.io_timeout;
std::chrono::seconds const silence_threshold = node.config.tcp.silent_timeout;
// Timeout threshold of 0 indicates no timeout
if (io_threshold.count () > 0)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in);
this_l->close ();
}
else
{
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::out, size, /* aggregate all */ true);
this_l->set_last_completion ();
}
if (next.callback)
{
next.callback (ec, size);
}
if (!ec)
{
this_l->write_queued_messages ();
}
}));
}
bool nano::transport::tcp_socket::max () const
{
return send_queue.size (traffic_type::generic) >= queue_size;
}
bool nano::transport::tcp_socket::full () const
{
return send_queue.size (traffic_type::generic) >= 2 * queue_size;
}
/** Call set_timeout with default_timeout as parameter */
void nano::transport::tcp_socket::set_default_timeout ()
{
set_timeout (default_timeout);
}
/** Set the current timeout of the socket in seconds
* timeout occurs when the last socket completion is more than timeout seconds in the past
* timeout always applies, the socket always has a timeout
* to set infinite timeout, use std::numeric_limits<uint64_t>::max ()
* the function checkup() checks for timeout on a regular interval
*/
void nano::transport::tcp_socket::set_timeout (std::chrono::seconds timeout_a)
{
timeout = timeout_a.count ();
}
void nano::transport::tcp_socket::set_last_completion ()
{
last_completion_time_or_init = nano::seconds_since_epoch ();
}
void nano::transport::tcp_socket::set_last_receive_time ()
{
last_receive_time_or_init = nano::seconds_since_epoch ();
}
void nano::transport::tcp_socket::ongoing_checkup ()
{
auto node_l = node_w.lock ();
if (!node_l)
{
return;
}
node_l->workers.post_delayed (std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 5), [this_w = weak_from_this ()] () {
auto this_l = this_w.lock ();
if (!this_l)
{
return;
}
auto node_l = this_l->node_w.lock ();
if (!node_l)
{
return;
}
boost::asio::post (this_l->strand, [this_l] {
if (!this_l->raw_socket.is_open ())
if (read_timestamp > 0 && timestamp - read_timestamp > io_threshold.count ())
{
this_l->close ();
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::timeout);
node.stats.inc (nano::stat::type::tcp_socket_timeout, nano::stat::detail::timeout_receive);
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in);
timed_out = true;
return false; // Bad
}
});
if (write_timestamp > 0 && timestamp - write_timestamp > io_threshold.count ())
{
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::timeout);
node.stats.inc (nano::stat::type::tcp_socket_timeout, nano::stat::detail::timeout_send);
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out);
nano::seconds_t now = nano::seconds_since_epoch ();
auto condition_to_disconnect{ false };
// if this is a server socket, and no data is received for silent_connection_tolerance_time seconds then disconnect
if (this_l->endpoint_type () == socket_endpoint::server && (now - this_l->last_receive_time_or_init) > static_cast<uint64_t> (this_l->silent_connection_tolerance_time.count ()))
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in);
condition_to_disconnect = true;
timed_out = true;
return false; // Bad
}
}
// if there is no activity for timeout seconds then disconnect
if ((now - this_l->last_completion_time_or_init) > this_l->timeout)
if (silence_threshold.count () > 0)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, this_l->endpoint_type () == socket_endpoint::server ? nano::stat::dir::in : nano::stat::dir::out);
if ((timestamp - last_receive) > silence_threshold.count () || (timestamp - last_send) > silence_threshold.count ())
{
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::timeout);
node.stats.inc (nano::stat::type::tcp_socket_timeout, nano::stat::detail::timeout_silence);
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in);
condition_to_disconnect = true;
timed_out = true;
return false; // Bad
}
}
}
else // Not connected yet
{
auto const now = std::chrono::steady_clock::now ();
auto const cutoff = now - node.config.tcp.connect_timeout;
if (condition_to_disconnect)
if (time_created < cutoff)
{
// TODO: Stats
node_l->logger.debug (nano::log::type::tcp_socket, "Socket timeout, closing: {}", this_l->remote);
this_l->timed_out = true;
this_l->close ();
}
else if (!this_l->closed)
{
this_l->ongoing_checkup ();
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::timeout);
node.stats.inc (nano::stat::type::tcp_socket_timeout, nano::stat::detail::timeout_connect);
timed_out = true;
return false; // Bad
}
}
return true; // Healthy
}
auto nano::transport::tcp_socket::co_connect (nano::endpoint endpoint) -> asio::awaitable<std::tuple<boost::system::error_code>>
{
// Dispatch operation to the strand
// TODO: This additional dispatch should not be necessary, but it is done during transition to coroutine based code
co_return co_await asio::co_spawn (strand, co_connect_impl (endpoint), asio::use_awaitable);
}
// TODO: This is only used in tests, remove it, this creates untracked socket
auto nano::transport::tcp_socket::co_connect_impl (nano::endpoint endpoint) -> asio::awaitable<std::tuple<boost::system::error_code>>
{
debug_assert (strand.running_in_this_thread ());
debug_assert (endpoint_type == socket_endpoint::client);
debug_assert (!raw_socket.is_open ());
debug_assert (connect_in_progress.exchange (true) == false);
auto result = co_await raw_socket.async_connect (endpoint, asio::as_tuple (asio::use_awaitable));
auto const & [ec] = result;
if (!ec)
{
// Best effort to get the endpoints
boost::system::error_code ec_ignored;
local_endpoint = raw_socket.local_endpoint (ec_ignored);
remote_endpoint = raw_socket.remote_endpoint (ec_ignored);
connected = true; // Mark as connected
time_connected = std::chrono::steady_clock::now ();
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_success);
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::connect_success);
node.logger.debug (nano::log::type::tcp_socket, "Successfully connected to: {} from local: {}",
remote_endpoint, local_endpoint);
}
else
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error);
node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::connect_error);
node.logger.debug (nano::log::type::tcp_socket, "Failed to connect to: {} ({})",
endpoint, local_endpoint, ec);
error = true;
close_impl ();
}
debug_assert (connect_in_progress.exchange (false) == true);
co_return result;
}
void nano::transport::tcp_socket::async_connect (nano::endpoint endpoint, std::function<void (boost::system::error_code const &)> callback)
{
debug_assert (callback);
asio::co_spawn (strand, co_connect_impl (endpoint), [callback, /* lifetime guard */ this_s = shared_from_this ()] (std::exception_ptr const & ex, auto const & result) {
release_assert (!ex);
auto const & [ec] = result;
callback (ec);
});
}
void nano::transport::tcp_socket::read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
boost::system::error_code nano::transport::tcp_socket::blocking_connect (nano::endpoint endpoint)
{
auto node_l = node_w.lock ();
if (!node_l)
{
return;
}
auto fut = asio::co_spawn (strand, co_connect_impl (endpoint), asio::use_future);
fut.wait (); // Blocking call
auto result = fut.get ();
auto const & [ec] = result;
return ec;
}
// Increase timeout to receive TCP header (idle server socket)
auto const prev_timeout = get_default_timeout_value ();
set_default_timeout_value (node_l->network_params.network.idle_timeout);
async_read (data_a, size_a, [callback_l = std::move (callback_a), prev_timeout, this_l = shared_from_this ()] (boost::system::error_code const & ec_a, std::size_t size_a) {
this_l->set_default_timeout_value (prev_timeout);
callback_l (ec_a, size_a);
auto nano::transport::tcp_socket::co_read (nano::shared_buffer buffer, size_t target_size) -> asio::awaitable<std::tuple<boost::system::error_code, size_t>>
{
// Dispatch operation to the strand
// TODO: This additional dispatch should not be necessary, but it is done during transition to coroutine based code
co_return co_await asio::co_spawn (strand, co_read_impl (buffer, target_size), asio::use_awaitable);
}
auto nano::transport::tcp_socket::co_read_impl (nano::shared_buffer buffer, size_t target_size) -> asio::awaitable<std::tuple<boost::system::error_code, size_t>>
{
debug_assert (strand.running_in_this_thread ());
debug_assert (read_in_progress.exchange (true) == false);
release_assert (target_size <= buffer->size (), "read buffer size mismatch");
read_timestamp = timestamp;
auto result = co_await asio::async_read (raw_socket, asio::buffer (buffer->data (), target_size), asio::as_tuple (asio::use_awaitable));
auto const & [ec, size_read] = result;
read_timestamp = 0;
if (!ec)
{
last_receive = timestamp;
node.stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::in, size_read);
}
else
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error);
node.logger.debug (nano::log::type::tcp_socket, "Error reading from: {} ({})", remote_endpoint, ec);
error = true;
close_impl ();
}
debug_assert (read_in_progress.exchange (false) == true);
co_return result;
}
void nano::transport::tcp_socket::async_read (nano::shared_buffer buffer, size_t size, std::function<void (boost::system::error_code const &, size_t)> callback)
{
debug_assert (callback);
asio::co_spawn (strand, co_read_impl (buffer, size), [callback, /* lifetime guard */ this_s = shared_from_this ()] (std::exception_ptr const & ex, auto const & result) {
release_assert (!ex);
auto const & [ec, size] = result;
callback (ec, size);
});
}
auto nano::transport::tcp_socket::blocking_read (nano::shared_buffer buffer, size_t size) -> std::tuple<boost::system::error_code, size_t>
{
auto fut = asio::co_spawn (strand, co_read_impl (buffer, size), asio::use_future);
fut.wait (); // Blocking call
auto result = fut.get ();
return result;
}
auto nano::transport::tcp_socket::co_write (nano::shared_buffer buffer, size_t target_size) -> asio::awaitable<std::tuple<boost::system::error_code, size_t>>
{
// Dispatch operation to the strand
// TODO: This additional dispatch should not be necessary, but it is done during transition to coroutine based code
co_return co_await asio::co_spawn (strand, co_write_impl (buffer, target_size), asio::use_awaitable);
}
auto nano::transport::tcp_socket::co_write_impl (nano::shared_buffer buffer, size_t target_size) -> asio::awaitable<std::tuple<boost::system::error_code, size_t>>
{
debug_assert (strand.running_in_this_thread ());
debug_assert (write_in_progress.exchange (true) == false);
release_assert (target_size <= buffer->size (), "write buffer size mismatch");
write_timestamp = timestamp;
auto result = co_await asio::async_write (raw_socket, asio::buffer (buffer->data (), target_size), asio::as_tuple (asio::use_awaitable));
auto const & [ec, size_written] = result;
write_timestamp = 0;
if (!ec)
{
last_send = timestamp;
node.stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::out, size_written);
}
else
{
node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error);
node.logger.debug (nano::log::type::tcp_socket, "Error writing to: {} ({})", remote_endpoint, ec);
error = true;
close_impl ();
}
debug_assert (write_in_progress.exchange (false) == true);
co_return result;
}
void nano::transport::tcp_socket::async_write (nano::shared_buffer buffer, std::function<void (boost::system::error_code const &, size_t)> callback)
{
debug_assert (callback);
asio::co_spawn (strand, co_write_impl (buffer, buffer->size ()), [callback, /* lifetime guard */ this_s = shared_from_this ()] (std::exception_ptr const & ex, auto const & result) {
release_assert (!ex);
auto const & [ec, size] = result;
callback (ec, size);
});
}
auto nano::transport::tcp_socket::blocking_write (nano::shared_buffer buffer, size_t size) -> std::tuple<boost::system::error_code, size_t>
{
auto fut = asio::co_spawn (strand, co_write_impl (buffer, size), asio::use_future);
fut.wait (); // Blocking call
auto result = fut.get ();
return result;
}
nano::endpoint nano::transport::tcp_socket::get_remote_endpoint () const
{
// Using cached value to avoid calling tcp_socket.remote_endpoint() which may be invalid (throw) after closing the socket
return remote_endpoint;
}
nano::endpoint nano::transport::tcp_socket::get_local_endpoint () const
{
// Using cached value to avoid calling tcp_socket.local_endpoint() which may be invalid (throw) after closing the socket
return local_endpoint;
}
nano::transport::socket_endpoint nano::transport::tcp_socket::get_endpoint_type () const
{
return endpoint_type;
}
bool nano::transport::tcp_socket::alive () const
{
return !closed;
}
bool nano::transport::tcp_socket::has_connected () const
{
return connected;
}
bool nano::transport::tcp_socket::has_timed_out () const
{
return timed_out;
}
void nano::transport::tcp_socket::set_default_timeout_value (std::chrono::seconds timeout_a)
std::chrono::steady_clock::time_point nano::transport::tcp_socket::get_time_created () const
{
default_timeout = timeout_a;
return time_created;
}
std::chrono::seconds nano::transport::tcp_socket::get_default_timeout_value () const
std::chrono::steady_clock::time_point nano::transport::tcp_socket::get_time_connected () const
{
return default_timeout;
}
void nano::transport::tcp_socket::close ()
{
boost::asio::dispatch (strand, [this_l = shared_from_this ()] {
this_l->close_internal ();
});
}
// This must be called from a strand or the destructor
void nano::transport::tcp_socket::close_internal ()
{
auto node_l = node_w.lock ();
if (!node_l)
{
return;
}
if (closed.exchange (true))
{
return;
}
send_queue.clear ();
default_timeout = std::chrono::seconds (0);
// Ignore error code for shutdown as it is best-effort
boost::system::error_code ec;
raw_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
raw_socket.close (ec);
if (ec)
{
node_l->stats.inc (nano::stat::type::socket, nano::stat::detail::error_socket_close);
node_l->logger.error (nano::log::type::tcp_socket, "Failed to close socket gracefully: {} ({})",
remote,
ec.message ());
}
else
{
// TODO: Stats
node_l->logger.debug (nano::log::type::tcp_socket, "Closed socket: {}", remote);
}
}
nano::tcp_endpoint nano::transport::tcp_socket::remote_endpoint () const
{
// Using cached value to avoid calling tcp_socket.remote_endpoint() which may be invalid (throw) after closing the socket
return remote;
}
nano::tcp_endpoint nano::transport::tcp_socket::local_endpoint () const
{
// Using cached value to avoid calling tcp_socket.local_endpoint() which may be invalid (throw) after closing the socket
return local;
return time_connected;
}
void nano::transport::tcp_socket::operator() (nano::object_stream & obs) const
{
obs.write ("remote_endpoint", remote_endpoint ());
obs.write ("local_endpoint", local_endpoint ());
obs.write ("remote_endpoint", remote_endpoint);
obs.write ("local_endpoint", local_endpoint);
obs.write ("type", type_m.load ());
obs.write ("endpoint_type", endpoint_type_m);
}
/*
* socket_queue
*/
nano::transport::socket_queue::socket_queue (std::size_t max_size_a) :
max_size{ max_size_a }
{
}
bool nano::transport::socket_queue::insert (const buffer_t & buffer, callback_t callback, nano::transport::traffic_type traffic_type)
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (queues[traffic_type].size () < 2 * max_size)
{
queues[traffic_type].push (entry{ buffer, callback });
return true; // Queued
}
return false; // Not queued
}
auto nano::transport::socket_queue::pop () -> std::optional<result_t>
{
nano::lock_guard<nano::mutex> guard{ mutex };
auto try_pop = [this] (nano::transport::traffic_type type) -> std::optional<result_t> {
auto & que = queues[type];
if (!que.empty ())
{
auto item = que.front ();
que.pop ();
return std::make_pair (item, type);
}
return std::nullopt;
};
// TODO: This is a very basic prioritization, implement something more advanced and configurable
if (auto item = try_pop (nano::transport::traffic_type::generic))
{
return item;
}
return std::nullopt;
}
void nano::transport::socket_queue::clear ()
{
nano::lock_guard<nano::mutex> guard{ mutex };
queues.clear ();
}
std::size_t nano::transport::socket_queue::size (nano::transport::traffic_type traffic_type) const
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (auto it = queues.find (traffic_type); it != queues.end ())
{
return it->second.size ();
}
return 0;
}
bool nano::transport::socket_queue::empty () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return std::all_of (queues.begin (), queues.end (), [] (auto const & que) {
return que.second.empty ();
});
obs.write ("endpoint_type", endpoint_type);
}
/*

View file

@ -1,117 +1,65 @@
#pragma once
#include <nano/boost/asio/ip/tcp.hpp>
#include <nano/boost/asio/strand.hpp>
#include <nano/lib/asio.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/async.hpp>
#include <nano/lib/logging.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/fwd.hpp>
#include <nano/node/transport/common.hpp>
#include <nano/node/transport/traffic_type.hpp>
#include <nano/node/transport/fwd.hpp>
#include <atomic>
#include <chrono>
#include <map>
#include <memory>
#include <optional>
#include <queue>
#include <unordered_map>
#include <vector>
namespace boost::asio::ip
{
class network_v6;
}
namespace nano
{
class node;
}
namespace nano::transport
{
class socket_queue final
{
public:
using buffer_t = nano::shared_const_buffer;
using callback_t = std::function<void (boost::system::error_code const &, std::size_t)>;
struct entry
{
buffer_t buffer;
callback_t callback;
};
public:
using result_t = std::pair<entry, nano::transport::traffic_type>;
explicit socket_queue (std::size_t max_size);
bool insert (buffer_t const &, callback_t, nano::transport::traffic_type);
std::optional<result_t> pop ();
void clear ();
std::size_t size (nano::transport::traffic_type) const;
bool empty () const;
std::size_t const max_size;
private:
mutable nano::mutex mutex;
std::unordered_map<nano::transport::traffic_type, std::queue<entry>> queues;
};
/** Socket class for tcp clients and newly accepted connections */
class tcp_socket final : public std::enable_shared_from_this<tcp_socket>
{
friend class tcp_server;
friend class tcp_channels;
friend class tcp_listener;
public:
static size_t constexpr default_queue_size = 16;
/** Construct a new (unconnected) socket */
explicit tcp_socket (nano::node &, nano::transport::socket_endpoint = socket_endpoint::client);
public:
explicit tcp_socket (nano::node &, nano::transport::socket_endpoint = socket_endpoint::client, size_t queue_size = default_queue_size);
// TODO: Accepting remote/local endpoints as a parameter is unnecessary, but is needed for now to keep compatibility with the legacy code
tcp_socket (
nano::node &,
boost::asio::ip::tcp::socket,
boost::asio::ip::tcp::endpoint remote_endpoint,
boost::asio::ip::tcp::endpoint local_endpoint,
nano::transport::socket_endpoint = socket_endpoint::server,
size_t queue_size = default_queue_size);
/** Construct from an existing (connected) socket */
tcp_socket (nano::node &, asio::ip::tcp::socket, nano::transport::socket_endpoint = socket_endpoint::server);
~tcp_socket ();
void start ();
void close ();
void close_async (); // Safe to call from io context
void async_connect (
boost::asio::ip::tcp::endpoint const & endpoint,
std::function<void (boost::system::error_code const &)> callback);
nano::endpoint get_remote_endpoint () const;
nano::endpoint get_local_endpoint () const;
nano::transport::socket_endpoint get_endpoint_type () const;
void async_read (
std::shared_ptr<std::vector<uint8_t>> const & buffer,
std::size_t size,
std::function<void (boost::system::error_code const &, std::size_t)> callback);
bool alive () const;
void async_write (
nano::shared_const_buffer const &,
std::function<void (boost::system::error_code const &, std::size_t)> callback = nullptr);
boost::asio::ip::tcp::endpoint remote_endpoint () const;
boost::asio::ip::tcp::endpoint local_endpoint () const;
/** Returns true if the socket has timed out */
bool has_connected () const;
bool has_timed_out () const;
/** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */
void set_default_timeout_value (std::chrono::seconds);
std::chrono::seconds get_default_timeout_value () const;
void set_timeout (std::chrono::seconds);
bool max () const;
bool full () const;
std::chrono::steady_clock::time_point get_time_created () const;
std::chrono::steady_clock::time_point get_time_connected () const;
public:
asio::awaitable<std::tuple<boost::system::error_code>> co_connect (nano::endpoint endpoint);
asio::awaitable<std::tuple<boost::system::error_code, size_t>> co_read (nano::shared_buffer, size_t size);
asio::awaitable<std::tuple<boost::system::error_code, size_t>> co_write (nano::shared_buffer, size_t size);
// Adapters for callback style code
void async_connect (nano::endpoint endpoint, std::function<void (boost::system::error_code const &)> callback);
void async_read (nano::shared_buffer, size_t size, std::function<void (boost::system::error_code const &, size_t)> callback = nullptr);
void async_write (nano::shared_buffer, std::function<void (boost::system::error_code const &, size_t)> callback = nullptr);
// Adapters for sync style code
boost::system::error_code blocking_connect (nano::endpoint endpoint);
std::tuple<boost::system::error_code, size_t> blocking_read (nano::shared_buffer, size_t size);
std::tuple<boost::system::error_code, size_t> blocking_write (nano::shared_buffer, size_t size);
private:
asio::awaitable<std::tuple<boost::system::error_code>> co_connect_impl (nano::endpoint endpoint);
asio::awaitable<std::tuple<boost::system::error_code, size_t>> co_read_impl (nano::shared_buffer, size_t size);
asio::awaitable<std::tuple<boost::system::error_code, size_t>> co_write_impl (nano::shared_buffer, size_t size);
public: // TODO: Remove these
nano::transport::socket_type type () const
{
return type_m;
@ -120,93 +68,51 @@ public:
{
type_m = type;
}
nano::transport::socket_endpoint endpoint_type () const
{
return endpoint_type_m;
}
bool is_realtime_connection () const
{
return type () == socket_type::realtime;
}
bool is_bootstrap_connection () const
{
return type () == socket_type::bootstrap;
}
bool is_closed () const
{
return closed;
}
bool alive () const
{
return !is_closed ();
}
std::chrono::steady_clock::time_point get_time_connected () const
{
return time_connected.load ();
}
private:
size_t const queue_size;
socket_queue send_queue;
void start ();
void stop ();
protected:
std::weak_ptr<nano::node> node_w;
void close_impl ();
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::socket raw_socket;
asio::awaitable<void> ongoing_checkup ();
bool checkup ();
/** The other end of the connection */
boost::asio::ip::tcp::endpoint remote;
boost::asio::ip::tcp::endpoint local;
private:
nano::node & node;
/** number of seconds of inactivity that causes a socket timeout
* activity is any successful connect, send or receive event
*/
std::atomic<uint64_t> timeout;
nano::async::strand strand;
nano::async::task task;
asio::ip::tcp::socket raw_socket;
/** the timestamp (in seconds since epoch) of the last time there was successful activity on the socket
* activity is any successful connect, send or receive event
*/
std::atomic<uint64_t> last_completion_time_or_init;
nano::endpoint remote_endpoint;
nano::endpoint local_endpoint;
nano::transport::socket_endpoint const endpoint_type;
/** the timestamp (in seconds since epoch) of the last time there was successful receive on the socket
* successful receive includes graceful closing of the socket by the peer (the read succeeds but returns 0 bytes)
*/
std::atomic<nano::seconds_t> last_receive_time_or_init;
/** Flag that is set when cleanup decides to close the socket due to timeout.
* NOTE: Currently used by tcp_server::timeout() but I suspect that this and tcp_server::timeout() are not needed.
*/
std::atomic<bool> connected{ false };
std::atomic<bool> closed{ false };
std::atomic<bool> error{ false };
std::atomic<bool> timed_out{ false };
/** the timeout value to use when calling set_default_timeout() */
std::atomic<std::chrono::seconds> default_timeout;
std::chrono::steady_clock::time_point const time_created{ std::chrono::steady_clock::now () };
std::atomic<std::chrono::steady_clock::time_point> time_connected{};
/** used in real time server sockets, number of seconds of no receive traffic that will cause the socket to timeout */
std::chrono::seconds silent_connection_tolerance_time;
/** Set by close() - completion handlers must check this. This is more reliable than checking
error codes as the OS may have already completed the async operation. */
std::atomic<bool> closed{ false };
/** Updated only from strand, but stored as atomic so it can be read from outside */
// Guard against conflicting concurrent async operations
std::atomic<bool> connect_in_progress{ false };
std::atomic<bool> read_in_progress{ false };
std::atomic<bool> write_in_progress{ false };
std::atomic<std::chrono::steady_clock::time_point> time_connected;
std::atomic<nano::transport::socket_type> type_m{ nano::transport::socket_type::undefined };
void close_internal ();
void write_queued_messages ();
void set_default_timeout ();
void set_last_completion ();
void set_last_receive_time ();
void ongoing_checkup ();
void read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a);
private:
socket_endpoint const endpoint_type_m;
std::atomic<socket_type> type_m{ socket_type::undefined };
private: // Accessed only from strand
// Using a low-resolution clock to track timeouts to avoid system clock overhead
uint64_t timestamp{ 1 };
uint64_t read_timestamp{ 0 };
uint64_t write_timestamp{ 0 };
uint64_t last_send{ 0 };
uint64_t last_receive{ 0 };
public: // Logging
virtual void operator() (nano::object_stream &) const;
void operator() (nano::object_stream &) const;
};
}

View file

@ -166,6 +166,14 @@ bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool
return result;
}
void nano::transport::throw_if_error (boost::system::error_code const & ec)
{
if (ec)
{
throw boost::system::system_error (ec);
}
}
nano::stat::detail nano::to_stat_detail (boost::system::error_code const & ec)
{
switch (ec.value ())

View file

@ -34,6 +34,8 @@ namespace socket_functions
boost::asio::ip::address last_ipv6_subnet_address (boost::asio::ip::address_v6 const &, std::size_t);
std::size_t count_subnetwork_connections (nano::transport::address_socket_mmap const &, boost::asio::ip::address_v6 const &, std::size_t);
}
void throw_if_error (boost::system::error_code const & ec);
}
namespace nano

View file

@ -1313,8 +1313,6 @@ TEST (telemetry, ongoing_requests)
auto node_client = system.add_node (node_flags);
auto node_server = system.add_node (node_flags);
nano::test::wait_peer_connections (system);
ASSERT_EQ (0, node_client->telemetry.size ());
ASSERT_EQ (0, node_server->telemetry.size ());
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
@ -1348,8 +1346,6 @@ namespace transport
system.add_node (node_flags);
}
nano::test::wait_peer_connections (system);
std::vector<std::thread> threads;
auto const num_threads = 4;
@ -1500,8 +1496,6 @@ TEST (telemetry, cache_read_and_timeout)
auto node_client = system.add_node (node_flags);
auto node_server = system.add_node (node_flags);
nano::test::wait_peer_connections (system);
// Request telemetry metrics
std::optional<nano::telemetry_data> telemetry_data;
auto channel = node_client->network.find_node_id (node_server->get_node_id ());