From d38eef60b4c185c0bf864ae2984c1b352139635c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 24 May 2025 15:52:24 +0200 Subject: [PATCH 1/4] Coroutine socket --- nano/core_test/network.cpp | 4 +- nano/core_test/socket.cpp | 30 +- nano/core_test/toml.cpp | 3 - nano/lib/asio.hpp | 10 + nano/lib/common.hpp | 10 +- nano/lib/stats_enums.hpp | 12 + nano/node/node.cpp | 2 +- nano/node/nodeconfig.cpp | 5 - nano/node/nodeconfig.hpp | 2 - nano/node/transport/common.hpp | 2 + nano/node/transport/tcp_channel.cpp | 61 ++- nano/node/transport/tcp_channel.hpp | 4 +- nano/node/transport/tcp_channels.cpp | 8 +- nano/node/transport/tcp_config.cpp | 10 +- nano/node/transport/tcp_config.hpp | 7 +- nano/node/transport/tcp_listener.cpp | 6 +- nano/node/transport/tcp_server.cpp | 12 +- nano/node/transport/tcp_socket.cpp | 779 +++++++++++++-------------- nano/node/transport/tcp_socket.hpp | 228 +++----- nano/node/transport/transport.cpp | 8 + nano/node/transport/transport.hpp | 2 + 21 files changed, 550 insertions(+), 655 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 66d46657..e584c326 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -135,8 +135,8 @@ TEST (network, last_contacted) // check that the endpoints are part of the same connection std::shared_ptr sock0 = channel0->socket; std::shared_ptr sock1 = channel1->socket; - ASSERT_EQ (sock0->local_endpoint (), sock1->remote_endpoint ()); - ASSERT_EQ (sock1->local_endpoint (), sock0->remote_endpoint ()); + ASSERT_EQ (sock0->get_local_endpoint (), sock1->get_remote_endpoint ()); + ASSERT_EQ (sock1->get_local_endpoint (), sock0->get_remote_endpoint ()); } // capture the state before and ensure the clock ticks at least once diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index d0c213a0..339e6ab9 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -82,10 +82,9 @@ TEST (socket, disconnection_of_silent_connections) nano::node_config config; // Increasing the timer timeout, so we don't let the connection to timeout due to the timer checker. - config.tcp_io_timeout = std::chrono::seconds::max (); - config.network_params.network.idle_timeout = std::chrono::seconds::max (); + config.tcp.io_timeout = std::chrono::seconds::max (); // Silent connections are connections open by external peers that don't contribute with any data. - config.network_params.network.silent_connection_tolerance_time = std::chrono::seconds{ 5 }; + config.tcp.silent_timeout = std::chrono::seconds{ 5 }; auto node = system.add_node (config); // On a connection, a server data socket is created. The shared pointer guarantees the object's lifecycle until the end of this test. @@ -110,7 +109,7 @@ TEST (socket, disconnection_of_silent_connections) // Checking the connection was closed. ASSERT_TIMELY (10s, server_data_socket_future.wait_for (0s) == std::future_status::ready); auto server_data_socket = server_data_socket_future.get (); - ASSERT_TIMELY (10s, server_data_socket->is_closed ()); + ASSERT_TIMELY (10s, !server_data_socket->alive ()); // Just to ensure the disconnection wasn't due to the timer timeout. ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in)); @@ -169,12 +168,15 @@ TEST (socket, drop_policy) ASSERT_EQ (1, client.use_count ()); }; + // TODO: FIXME: Socket no longer queues writes, so this test is no longer valid + size_t constexpr queue_size = 128; + // We're going to write twice the queue size + 1, and the server isn't reading // The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop) - func (nano::transport::tcp_socket::default_queue_size * 2 + 1); + func (queue_size * 2 + 1); ASSERT_EQ (1, failed_writes); - func (nano::transport::tcp_socket::default_queue_size + 1); + func (queue_size + 1); ASSERT_EQ (0, failed_writes); } @@ -231,7 +233,7 @@ TEST (socket, concurrent_writes) accept_callback_t accept_callback = [&] (boost::system::error_code const & ec, boost::asio::ip::tcp::socket socket) { if (!ec) { - auto new_connection = std::make_shared (*node, std::move (socket), socket.remote_endpoint (), socket.local_endpoint ()); + auto new_connection = std::make_shared (*node, std::move (socket)); connections.push_back (new_connection); reader (new_connection); @@ -306,7 +308,7 @@ TEST (socket_timeout, connect) // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; - node->config.tcp_io_timeout = 1s; + node->config.tcp.io_timeout = 1s; // try to connect to an IP address that most likely does not exist and will not reply // we want the tcp stack to not receive a negative reply, we want it to see silence and to keep trying @@ -332,7 +334,7 @@ TEST (socket_timeout, read) // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; - node->config.tcp_io_timeout = std::chrono::seconds (2); + node->config.tcp.io_timeout = std::chrono::seconds (2); // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); @@ -380,7 +382,7 @@ TEST (socket_timeout, write) // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; - node->config.tcp_io_timeout = std::chrono::seconds (2); + node->config.tcp.io_timeout = std::chrono::seconds (2); // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); @@ -398,7 +400,7 @@ TEST (socket_timeout, write) // create a client socket and send lots of data to fill the socket queue on the local and remote side // eventually, the all tcp queues should fill up and async_write will not be able to progress // and the timeout should kick in and close the socket, which will cause the async_write to return an error - auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client, 1024 * 1024); // socket with a max queue size much larger than OS buffers + auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client); // socket with a max queue size much larger than OS buffers socket->async_connect (acceptor.local_endpoint (), [&socket, &done] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); @@ -428,7 +430,7 @@ TEST (socket_timeout, read_overlapped) // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; - node->config.tcp_io_timeout = std::chrono::seconds (2); + node->config.tcp.io_timeout = std::chrono::seconds (2); // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); @@ -489,7 +491,7 @@ TEST (socket_timeout, write_overlapped) // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; - node->config.tcp_io_timeout = std::chrono::seconds (2); + node->config.tcp.io_timeout = std::chrono::seconds (2); // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); @@ -513,7 +515,7 @@ TEST (socket_timeout, write_overlapped) // create a client socket and send lots of data to fill the socket queue on the local and remote side // eventually, the all tcp queues should fill up and async_write will not be able to progress // and the timeout should kick in and close the socket, which will cause the async_write to return an error - auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client, 1024 * 1024); // socket with a max queue size much larger than OS buffers + auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client); // socket with a max queue size much larger than OS buffers socket->async_connect (acceptor.local_endpoint (), [&socket, &done] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 604d2041..986869ae 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -306,7 +306,6 @@ TEST (toml_config, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.preconfigured_representatives, defaults.node.preconfigured_representatives); ASSERT_EQ (conf.node.receive_minimum, defaults.node.receive_minimum); ASSERT_EQ (conf.node.signature_checker_threads, defaults.node.signature_checker_threads); - ASSERT_EQ (conf.node.tcp_io_timeout, defaults.node.tcp_io_timeout); ASSERT_EQ (conf.node.unchecked_cutoff_time, defaults.node.unchecked_cutoff_time); ASSERT_EQ (conf.node.use_memory_pools, defaults.node.use_memory_pools); ASSERT_EQ (conf.node.vote_generator_delay, defaults.node.vote_generator_delay); @@ -472,7 +471,6 @@ TEST (toml_config, daemon_config_deserialize_no_defaults) preconfigured_representatives = ["nano_3arg3asgtigae3xckabaaewkx3bzsh7nwz7jkmjos79ihyaxwphhm6qgjps4"] receive_minimum = "999" signature_checker_threads = 999 - tcp_io_timeout = 999 unchecked_cutoff_time = 999 use_memory_pools = false vote_generator_delay = 999 @@ -740,7 +738,6 @@ TEST (toml_config, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.preconfigured_representatives, defaults.node.preconfigured_representatives); ASSERT_NE (conf.node.receive_minimum, defaults.node.receive_minimum); ASSERT_NE (conf.node.signature_checker_threads, defaults.node.signature_checker_threads); - ASSERT_NE (conf.node.tcp_io_timeout, defaults.node.tcp_io_timeout); ASSERT_NE (conf.node.unchecked_cutoff_time, defaults.node.unchecked_cutoff_time); ASSERT_NE (conf.node.use_memory_pools, defaults.node.use_memory_pools); ASSERT_NE (conf.node.vote_generator_delay, defaults.node.vote_generator_delay); diff --git a/nano/lib/asio.hpp b/nano/lib/asio.hpp index dcbce2a5..946a853d 100644 --- a/nano/lib/asio.hpp +++ b/nano/lib/asio.hpp @@ -1,9 +1,14 @@ #pragma once +#include +#include #include namespace nano { +using shared_buffer = std::shared_ptr>; + +// TODO: Replace with just shared_buffer class shared_const_buffer { public: @@ -22,6 +27,11 @@ public: std::size_t size () const; std::vector to_bytes () const; + operator nano::shared_buffer () const + { + return m_data; + } + private: std::shared_ptr> m_data; boost::asio::const_buffer m_buffer; diff --git a/nano/lib/common.hpp b/nano/lib/common.hpp index 990aab30..2ad8f65d 100644 --- a/nano/lib/common.hpp +++ b/nano/lib/common.hpp @@ -1,16 +1,10 @@ #pragma once -namespace boost::asio::ip -{ -class address; -class tcp; -template -class basic_endpoint; -} +#include namespace nano { using ip_address = boost::asio::ip::address; using endpoint = boost::asio::ip::basic_endpoint; -using tcp_endpoint = endpoint; +using tcp_endpoint = endpoint; // TODO: Remove this alias } diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index f838326b..cbb1b003 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -36,6 +36,8 @@ enum class type ipc, tcp, tcp_server, + tcp_socket, + tcp_socket_timeout, tcp_channel, tcp_channel_queued, tcp_channel_send, @@ -180,6 +182,7 @@ enum class detail refresh, sent, reset, + close, // processing queue queue, @@ -361,10 +364,19 @@ enum class detail // tcp tcp_silent_connection_drop, tcp_io_timeout_drop, + tcp_connect_success, tcp_connect_error, tcp_read_error, tcp_write_error, + // tcp_socket + unhealthy, + already_closed, + timeout_receive, + timeout_send, + timeout_connect, + timeout_silence, + // tcp_listener accept_success, accept_error, diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 1244775f..d0616746 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -640,7 +640,7 @@ void nano::node::stop () // work pool is not stopped on purpose due to testing setup // Stop the IO runner last - runner.abort (); + runner.abort (); // TODO: Remove this runner.join (); debug_assert (io_ctx_shared.use_count () == 1); // Node should be the last user of the io_context } diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index a73e77c7..cf8ee6fc 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -121,7 +121,6 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const toml.put ("vote_minimum", vote_minimum.to_string_dec (), "Local representatives do not vote if the delegated weight is under this threshold. Saves on system resources.\ntype:string,amount,raw"); toml.put ("vote_generator_delay", vote_generator_delay.count (), "Delay before votes are sent to allow for efficient bundling of hashes in votes.\ntype:milliseconds"); toml.put ("unchecked_cutoff_time", unchecked_cutoff_time.count (), "Number of seconds before deleting an unchecked entry.\nWarning: lower values (e.g., 3600 seconds, or 1 hour) may result in unsuccessful bootstraps, especially a bootstrap from scratch.\ntype:seconds"); - toml.put ("tcp_io_timeout", tcp_io_timeout.count (), "Timeout for TCP connect-, read- and write operations.\nWarning: a low value (e.g., below 5 seconds) may result in TCP connections failing.\ntype:seconds"); toml.put ("pow_sleep_interval", pow_sleep_interval.count (), "Time to sleep between batch work generation attempts. Reduces max CPU usage at the expense of a longer generation time.\ntype:nanoseconds"); toml.put ("external_address", external_address, "The external address of this node (NAT). If not set, the node will request this information via UPnP.\ntype:string,ip"); toml.put ("external_port", external_port, "The external port number of this node (NAT). Only used if external_address is set.\ntype:uint16"); @@ -575,10 +574,6 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) toml.get ("unchecked_cutoff_time", unchecked_cutoff_time_l); unchecked_cutoff_time = std::chrono::seconds (unchecked_cutoff_time_l); - auto tcp_io_timeout_l = static_cast (tcp_io_timeout.count ()); - toml.get ("tcp_io_timeout", tcp_io_timeout_l); - tcp_io_timeout = std::chrono::seconds (tcp_io_timeout_l); - if (toml.has_key ("peering_port")) { std::uint16_t peering_port_l{}; diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 72fcaa1f..e191816c 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -117,8 +117,6 @@ public: uint16_t external_port{ 0 }; std::chrono::milliseconds block_processor_batch_max_time{ std::chrono::milliseconds (500) }; std::chrono::seconds unchecked_cutoff_time{ std::chrono::seconds (4 * 60 * 60) }; // 4 hours - /** Timeout for initiated async operations */ - std::chrono::seconds tcp_io_timeout{ (network_params.network.is_dev_network () && !is_sanitizer_build ()) ? std::chrono::seconds (5) : std::chrono::seconds (15) }; std::chrono::nanoseconds pow_sleep_interval{ 0 }; bool use_memory_pools{ true }; diff --git a/nano/node/transport/common.hpp b/nano/node/transport/common.hpp index 58f15ce4..5232082b 100644 --- a/nano/node/transport/common.hpp +++ b/nano/node/transport/common.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace nano::transport diff --git a/nano/node/transport/tcp_channel.cpp b/nano/node/transport/tcp_channel.cpp index 3774d5e5..bf5db21f 100644 --- a/nano/node/transport/tcp_channel.cpp +++ b/nano/node/transport/tcp_channel.cpp @@ -16,8 +16,8 @@ nano::transport::tcp_channel::tcp_channel (nano::node & node_a, std::shared_ptr< strand{ node_a.io_ctx.get_executor () }, sending_task{ strand } { - remote_endpoint = socket_a->remote_endpoint (); - local_endpoint = socket_a->local_endpoint (); + remote_endpoint = socket_a->get_remote_endpoint (); + local_endpoint = socket_a->get_local_endpoint (); start (); } @@ -49,9 +49,13 @@ asio::awaitable nano::transport::tcp_channel::start_sending (nano::async:: } catch (boost::system::system_error const & ex) { - // Operation aborted is expected when cancelling the acceptor + // Operation aborted is expected when cancelling the task debug_assert (ex.code () == asio::error::operation_aborted); } + catch (...) + { + release_assert (false, "unexpected exception"); + } debug_assert (strand.running_in_this_thread ()); } @@ -63,6 +67,7 @@ void nano::transport::tcp_channel::stop () debug_assert (!node.io_ctx.stopped ()); // Ensure that we are not trying to await the task while running on the same thread / io_context debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ()); + sending_task.cancel (); sending_task.join (); } @@ -117,7 +122,11 @@ asio::awaitable nano::transport::tcp_channel::run_sending (nano::async::co { for (auto const & [type, item] : batch) { - co_await send_one (type, item); + auto ec = co_await send_one (type, item); + if (ec) + { + co_return; // Stop on error + } } } else @@ -127,20 +136,13 @@ asio::awaitable nano::transport::tcp_channel::run_sending (nano::async::co } } -asio::awaitable nano::transport::tcp_channel::send_one (traffic_type type, tcp_channel_queue::entry_t const & item) +asio::awaitable nano::transport::tcp_channel::send_one (traffic_type type, tcp_channel_queue::entry_t const & item) { debug_assert (strand.running_in_this_thread ()); auto const & [buffer, callback] = item; auto const size = buffer.size (); - // Wait for socket - while (socket->full ()) - { - node.stats.inc (nano::stat::type::tcp_channel_wait, nano::stat::detail::wait_socket, nano::stat::dir::out); - co_await nano::async::sleep_for (100ms); // TODO: Exponential backoff - } - // Wait for bandwidth // This is somewhat inefficient // The performance impact *should* be mitigated by the fact that we allocate it in larger chunks, so this happens relatively infrequently @@ -163,21 +165,26 @@ asio::awaitable nano::transport::tcp_channel::send_one (traffic_type type, node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::send, nano::stat::dir::out); node.stats.inc (nano::stat::type::tcp_channel_send, to_stat_detail (type), nano::stat::dir::out); - socket->async_write (buffer, [this_w = weak_from_this (), callback, type] (boost::system::error_code const & ec, std::size_t size) { - if (auto this_l = this_w.lock ()) - { - this_l->node.stats.inc (nano::stat::type::tcp_channel_ec, nano::to_stat_detail (ec), nano::stat::dir::out); - if (!ec) - { - this_l->node.stats.add (nano::stat::type::traffic_tcp_type, to_stat_detail (type), nano::stat::dir::out, size); - this_l->set_last_packet_sent (std::chrono::steady_clock::now ()); - } - } - if (callback) - { - callback (ec, size); - } - }); + auto [ec, size_written] = co_await socket->co_write (buffer, buffer.size ()); + debug_assert (ec || size_written == size); + debug_assert (strand.running_in_this_thread ()); + + if (!ec) + { + node.stats.add (nano::stat::type::traffic_tcp_type, to_stat_detail (type), nano::stat::dir::out, size_written); + set_last_packet_sent (std::chrono::steady_clock::now ()); + } + else + { + node.stats.inc (nano::stat::type::tcp_channel_ec, nano::to_stat_detail (ec), nano::stat::dir::out); + } + + if (callback) + { + callback (ec, size_written); + } + + co_return ec; } bool nano::transport::tcp_channel::alive () const diff --git a/nano/node/transport/tcp_channel.hpp b/nano/node/transport/tcp_channel.hpp index 84ebd382..78a75202 100644 --- a/nano/node/transport/tcp_channel.hpp +++ b/nano/node/transport/tcp_channel.hpp @@ -44,8 +44,6 @@ private: class tcp_channel final : public nano::transport::channel, public std::enable_shared_from_this { - friend class nano::transport::tcp_channels; - public: tcp_channel (nano::node &, std::shared_ptr); ~tcp_channel () override; @@ -74,7 +72,7 @@ private: asio::awaitable start_sending (nano::async::condition &); asio::awaitable run_sending (nano::async::condition &); - asio::awaitable send_one (traffic_type, tcp_channel_queue::entry_t const &); + asio::awaitable send_one (traffic_type, tcp_channel_queue::entry_t const &); public: std::shared_ptr socket; diff --git a/nano/node/transport/tcp_channels.cpp b/nano/node/transport/tcp_channels.cpp index 5335e535..3af0ba26 100644 --- a/nano/node/transport/tcp_channels.cpp +++ b/nano/node/transport/tcp_channels.cpp @@ -86,10 +86,9 @@ bool nano::transport::tcp_channels::check (const nano::tcp_endpoint & endpoint, return true; // OK } -// This should be the only place in node where channels are created std::shared_ptr nano::transport::tcp_channels::create (const std::shared_ptr & socket, const std::shared_ptr & server, const nano::account & node_id) { - auto const endpoint = socket->remote_endpoint (); + auto const endpoint = socket->get_remote_endpoint (); debug_assert (endpoint.address ().is_v6 ()); nano::unique_lock lock{ mutex }; @@ -110,10 +109,11 @@ std::shared_ptr nano::transport::tcp_channels::cre node.stats.inc (nano::stat::type::tcp_channels, nano::stat::detail::channel_accepted); node.logger.debug (nano::log::type::tcp_channels, "Accepted channel: {} ({}) ({})", - socket->remote_endpoint (), - to_string (socket->endpoint_type ()), + socket->get_remote_endpoint (), + to_string (socket->get_endpoint_type ()), node_id.to_node_id ()); + // This should be the only place in node where channels are created auto channel = std::make_shared (node, socket); channel->set_node_id (node_id); diff --git a/nano/node/transport/tcp_config.cpp b/nano/node/transport/tcp_config.cpp index b85d0beb..488af6c3 100644 --- a/nano/node/transport/tcp_config.cpp +++ b/nano/node/transport/tcp_config.cpp @@ -7,9 +7,11 @@ nano::error nano::transport::tcp_config::serialize (nano::tomlconfig & toml) con toml.put ("max_attempts", max_attempts, "Maximum connection attempts. \ntype:uint64"); toml.put ("max_attempts_per_ip", max_attempts_per_ip, "Maximum connection attempts per IP. \ntype:uint64"); - toml.put ("connect_timeout", connect_timeout.count (), "Timeout for establishing TCP connection in seconds. \ntype:uint64"); - toml.put ("handshake_timeout", handshake_timeout.count (), "Timeout for completing handshake in seconds. \ntype:uint64"); - toml.put ("io_timeout", io_timeout.count (), "Timeout for TCP I/O operations in seconds. \ntype:uint64"); + toml.put ("connect_timeout", connect_timeout.count (), "Timeout for establishing TCP connection in seconds. \ntype:seconds"); + toml.put ("handshake_timeout", handshake_timeout.count (), "Timeout for completing node handshake in seconds. \ntype:seconds"); + toml.put ("io_timeout", io_timeout.count (), "Timeout for TCP I/O operations in seconds. Use 0 to disable timeout. \ntype:seconds"); + toml.put ("silent_timeout", silent_timeout.count (), "Timeout for silent TCP connections in seconds. Use 0 to disable timeout. \ntype:seconds"); + toml.put ("checkup_interval", checkup_interval.count (), "Interval for checking health of TCP connections in seconds. \ntype:seconds"); return toml.get_error (); } @@ -24,6 +26,8 @@ nano::error nano::transport::tcp_config::deserialize (nano::tomlconfig & toml) toml.get_duration ("connect_timeout", connect_timeout); toml.get_duration ("handshake_timeout", handshake_timeout); toml.get_duration ("io_timeout", io_timeout); + toml.get_duration ("silent_timeout", silent_timeout); + toml.get_duration ("checkup_interval", checkup_interval); return toml.get_error (); } \ No newline at end of file diff --git a/nano/node/transport/tcp_config.hpp b/nano/node/transport/tcp_config.hpp index 1a8a99d6..db34e725 100644 --- a/nano/node/transport/tcp_config.hpp +++ b/nano/node/transport/tcp_config.hpp @@ -18,7 +18,8 @@ public: max_outbound_connections = 128; max_attempts = 128; max_attempts_per_ip = 128; - connect_timeout = std::chrono::seconds{ 5 }; + connect_timeout = 5s; + checkup_interval = 1s; } } @@ -31,8 +32,10 @@ public: size_t max_outbound_connections{ 2048 }; size_t max_attempts{ 60 }; size_t max_attempts_per_ip{ 1 }; - std::chrono::seconds connect_timeout{ 60 }; + std::chrono::seconds connect_timeout{ 30 }; std::chrono::seconds handshake_timeout{ 30 }; std::chrono::seconds io_timeout{ 30 }; + std::chrono::seconds silent_timeout{ 30 }; + std::chrono::seconds checkup_interval{ 5 }; }; } \ No newline at end of file diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 388be082..12994eb7 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -235,7 +235,7 @@ void nano::transport::tcp_listener::timeout () connection.endpoint, nano::log::seconds_delta (connection.socket->get_time_connected ())); - connection.socket->close (); // Schedule socket close, this is non-blocking, safe to call under lock + connection.socket->close_async (); // Schedule socket close, this is non-blocking, safe to call under lock } } } @@ -431,15 +431,13 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::accept_success, to_stat_dir (type)); logger.debug (nano::log::type::tcp_listener, "Accepted connection: {} ({})", remote_endpoint, to_string (type)); - auto socket = std::make_shared (node, std::move (raw_socket), remote_endpoint, local_endpoint, to_socket_endpoint (type)); + auto socket = std::make_shared (node, std::move (raw_socket), to_socket_endpoint (type)); auto server = std::make_shared (socket, node.shared (), true); connections.emplace_back (connection{ type, remote_endpoint, socket, server }); lock.unlock (); - socket->set_timeout (node.network_params.network.idle_timeout); - socket->start (); server->start (); connection_accepted.notify (socket, server); diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 2ed50151..127f358a 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -18,7 +18,7 @@ nano::transport::tcp_server::tcp_server (std::shared_ptr (node_a->network_params.network, node_a->network.filter, node_a->block_uniquer, node_a->vote_uniquer, [socket_l = socket] (std::shared_ptr> const & data_a, size_t size_a, std::function callback_a) { debug_assert (socket_l != nullptr); - socket_l->read_impl (data_a, size_a, callback_a); + socket_l->async_read (data_a, size_a, callback_a); }) } { @@ -43,7 +43,7 @@ void nano::transport::tcp_server::start () // Set remote_endpoint if (remote_endpoint.port () == 0) { - remote_endpoint = socket->remote_endpoint (); + remote_endpoint = socket->get_remote_endpoint (); debug_assert (remote_endpoint.port () != 0); } @@ -62,7 +62,7 @@ void nano::transport::tcp_server::stop () { if (!stopped.exchange (true)) { - socket->close (); + socket->close_async (); } } @@ -516,7 +516,7 @@ void nano::transport::tcp_server::timeout () { node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", remote_endpoint); - socket->close (); + socket->close_async (); } } @@ -605,10 +605,10 @@ bool nano::transport::tcp_server::is_undefined_connection () const bool nano::transport::tcp_server::is_bootstrap_connection () const { - return socket->is_bootstrap_connection (); + return socket->type () == nano::transport::socket_type::bootstrap; } bool nano::transport::tcp_server::is_realtime_connection () const { - return socket->is_realtime_connection (); + return socket->type () == nano::transport::socket_type::realtime; } diff --git a/nano/node/transport/tcp_socket.cpp b/nano/node/transport/tcp_socket.cpp index 4fe42e24..8041fb03 100644 --- a/nano/node/transport/tcp_socket.cpp +++ b/nano/node/transport/tcp_socket.cpp @@ -12,484 +12,443 @@ #include #include -/* - * socket - */ - -nano::transport::tcp_socket::tcp_socket (nano::node & node_a, nano::transport::socket_endpoint endpoint_type_a, size_t queue_size_a) : - tcp_socket{ node_a, boost::asio::ip::tcp::socket{ node_a.io_ctx }, {}, {}, endpoint_type_a, queue_size_a } +nano::transport::tcp_socket::tcp_socket (nano::node & node_a, nano::transport::socket_endpoint endpoint_type_a) : + node{ node_a }, + strand{ node_a.io_ctx.get_executor () }, + task{ strand }, + raw_socket{ node_a.io_ctx }, + endpoint_type{ endpoint_type_a } { + start (); } -nano::transport::tcp_socket::tcp_socket (nano::node & node_a, boost::asio::ip::tcp::socket raw_socket_a, boost::asio::ip::tcp::endpoint remote_endpoint_a, boost::asio::ip::tcp::endpoint local_endpoint_a, nano::transport::socket_endpoint endpoint_type_a, size_t queue_size_a) : - queue_size{ queue_size_a }, - send_queue{ queue_size }, - node_w{ node_a.shared () }, +nano::transport::tcp_socket::tcp_socket (nano::node & node_a, asio::ip::tcp::socket raw_socket_a, nano::transport::socket_endpoint endpoint_type_a) : + node{ node_a }, strand{ node_a.io_ctx.get_executor () }, + task{ strand }, raw_socket{ std::move (raw_socket_a) }, - remote{ remote_endpoint_a }, - local{ local_endpoint_a }, - endpoint_type_m{ endpoint_type_a }, - timeout{ std::numeric_limits::max () }, - last_completion_time_or_init{ nano::seconds_since_epoch () }, - last_receive_time_or_init{ nano::seconds_since_epoch () }, - default_timeout{ node_a.config.tcp_io_timeout }, - silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time } + local_endpoint{ raw_socket.local_endpoint () }, + remote_endpoint{ raw_socket.remote_endpoint () }, + endpoint_type{ endpoint_type_a }, + connected{ true }, + time_connected{ std::chrono::steady_clock::now () } { - time_connected = std::chrono::steady_clock::now (); + start (); } nano::transport::tcp_socket::~tcp_socket () { - close_internal (); - closed = true; + close (); +} + +void nano::transport::tcp_socket::close () +{ + stop (); + + if (closed) // Avoid closing the socket multiple times + { + return; + } + + // Node context must be running to gracefully stop async tasks + debug_assert (!node.io_ctx.stopped ()); + // Ensure that we are not trying to await the task while running on the same thread / io_context + debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ()); + + // Dispatch close raw socket to the strand, wait synchronously for the operation to complete + auto fut = asio::dispatch (strand, asio::use_future ([this] () { + close_impl (); + })); + fut.wait (); // Blocking call +} + +void nano::transport::tcp_socket::close_async () +{ + if (closed) // Avoid closing the socket multiple times + { + return; + } + + // Node context must be running to gracefully stop async tasks + debug_assert (!node.io_ctx.stopped ()); + + asio::post (strand, [this, /* lifetime guard */ this_s = shared_from_this ()] () { + close_impl (); + }); +} + +void nano::transport::tcp_socket::close_impl () +{ + debug_assert (strand.running_in_this_thread ()); + + if (closed.exchange (true)) // Avoid closing the socket multiple times + { + return; + } + + boost::system::error_code ec; + raw_socket.shutdown (asio::ip::tcp::socket::shutdown_both, ec); // Best effort, ignore errors + raw_socket.close (ec); // Best effort, ignore errors + if (!ec) + { + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::close); + node.logger.debug (nano::log::type::tcp_socket, "Closed socket: {}", remote_endpoint); + } + else + { + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::close_error); + node.logger.debug (nano::log::type::tcp_socket, "Closed socket, ungracefully: {} ({})", remote_endpoint, ec.message ()); + } } void nano::transport::tcp_socket::start () { - ongoing_checkup (); + release_assert (!task.joinable ()); + task = nano::async::task (strand, ongoing_checkup ()); } -void nano::transport::tcp_socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function callback_a) +void nano::transport::tcp_socket::stop () { - debug_assert (callback_a); - debug_assert (endpoint_type () == socket_endpoint::client); + if (task.running ()) + { + // Node context must be running to gracefully stop async tasks + debug_assert (!node.io_ctx.stopped ()); + // Ensure that we are not trying to await the task while running on the same thread / io_context + debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ()); - start (); - set_default_timeout (); + task.cancel (); + task.join (); + } +} - boost::asio::post (strand, [this_l = shared_from_this (), endpoint_a, callback = std::move (callback_a)] () { - this_l->raw_socket.async_connect (endpoint_a, - boost::asio::bind_executor (this_l->strand, [this_l, callback = std::move (callback), endpoint_a] (boost::system::error_code const & ec) { - debug_assert (this_l->strand.running_in_this_thread ()); - - auto node_l = this_l->node_w.lock (); - if (!node_l) +auto nano::transport::tcp_socket::ongoing_checkup () -> asio::awaitable +{ + debug_assert (strand.running_in_this_thread ()); + try + { + while (!co_await nano::async::cancelled () && alive ()) + { + bool healthy = checkup (); + if (!healthy) { - return; - } + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::unhealthy); + node.logger.debug (nano::log::type::tcp_socket, "Unhealthy socket detected: {} (timed out: {})", + remote_endpoint, + timed_out.load ()); - this_l->remote = endpoint_a; + close_impl (); - if (ec) - { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error, nano::stat::dir::in); - this_l->close (); + break; // Stop the checkup task } else { - this_l->time_connected = std::chrono::steady_clock::now (); - this_l->set_last_completion (); - { - // Best effort attempt to get endpoint address - boost::system::error_code ec; - this_l->local = this_l->raw_socket.local_endpoint (ec); - } - node_l->logger.debug (nano::log::type::tcp_socket, "Successfully connected to: {}, local: {}", this_l->remote, this_l->local); + std::chrono::seconds sleep_duration = node.config.tcp.checkup_interval; + co_await nano::async::sleep_for (sleep_duration); + timestamp += sleep_duration.count (); } - callback (ec); - })); - }); + } + } + catch (boost::system::system_error const & ex) + { + // Operation aborted is expected when cancelling the acceptor + debug_assert (ex.code () == asio::error::operation_aborted); + } + debug_assert (strand.running_in_this_thread ()); + + // Close the socket if checkup task is canceled for any reason + close_impl (); } -void nano::transport::tcp_socket::async_read (std::shared_ptr> const & buffer_a, std::size_t size_a, std::function callback_a) -{ - debug_assert (callback_a); - - if (size_a <= buffer_a->size ()) - { - if (!closed) - { - set_default_timeout (); - boost::asio::post (strand, [this_l = shared_from_this (), buffer_a, callback = std::move (callback_a), size_a] () mutable { - boost::asio::async_read (this_l->raw_socket, boost::asio::buffer (buffer_a->data (), size_a), - boost::asio::bind_executor (this_l->strand, - [this_l, buffer_a, cbk = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) { - debug_assert (this_l->strand.running_in_this_thread ()); - - auto node_l = this_l->node_w.lock (); - if (!node_l) - { - return; - } - - if (ec) - { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in); - this_l->close (); - } - else - { - node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::in, size_a); - this_l->set_last_completion (); - this_l->set_last_receive_time (); - } - cbk (ec, size_a); - })); - }); - } - } - else - { - debug_assert (false && "nano::transport::tcp_socket::async_read called with incorrect buffer size"); - boost::system::error_code ec_buffer = boost::system::errc::make_error_code (boost::system::errc::no_buffer_space); - callback_a (ec_buffer, 0); - } -} - -void nano::transport::tcp_socket::async_write (nano::shared_const_buffer const & buffer_a, std::function callback_a) -{ - auto node_l = node_w.lock (); - if (!node_l) - { - return; - } - - if (closed) - { - if (callback_a) - { - node_l->io_ctx.post ([callback = std::move (callback_a)] () { - callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - }); - } - return; - } - - bool queued = send_queue.insert (buffer_a, callback_a, traffic_type::generic); - if (!queued) - { - if (callback_a) - { - node_l->io_ctx.post ([callback = std::move (callback_a)] () { - callback (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); - }); - } - return; - } - - boost::asio::post (strand, [this_s = shared_from_this (), buffer_a, callback_a] () { - if (!this_s->write_in_progress) - { - this_s->write_queued_messages (); - } - }); -} - -// Must be called from strand -void nano::transport::tcp_socket::write_queued_messages () +bool nano::transport::tcp_socket::checkup () { debug_assert (strand.running_in_this_thread ()); - if (closed) + if (connected) { - return; - } - - auto maybe_next = send_queue.pop (); - if (!maybe_next) - { - return; - } - auto const & [next, type] = *maybe_next; - - set_default_timeout (); - - write_in_progress = true; - nano::async_write (raw_socket, next.buffer, - boost::asio::bind_executor (strand, [this_l = shared_from_this (), next /* `next` object keeps buffer in scope */, type] (boost::system::error_code ec, std::size_t size) { - debug_assert (this_l->strand.running_in_this_thread ()); - - auto node_l = this_l->node_w.lock (); - if (!node_l) + if (!raw_socket.is_open ()) { - return; + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::already_closed); + return false; // Bad } - this_l->write_in_progress = false; - if (ec) + debug_assert (timestamp >= read_timestamp); + debug_assert (timestamp >= write_timestamp); + debug_assert (timestamp >= last_receive); + debug_assert (timestamp >= last_send); + + std::chrono::seconds const io_threshold = node.config.tcp.io_timeout; + std::chrono::seconds const silence_threshold = node.config.tcp.silent_timeout; + + // Timeout threshold of 0 indicates no timeout + if (io_threshold.count () > 0) { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in); - this_l->close (); - } - else - { - node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::out, size, /* aggregate all */ true); - this_l->set_last_completion (); - } - - if (next.callback) - { - next.callback (ec, size); - } - - if (!ec) - { - this_l->write_queued_messages (); - } - })); -} - -bool nano::transport::tcp_socket::max () const -{ - return send_queue.size (traffic_type::generic) >= queue_size; -} - -bool nano::transport::tcp_socket::full () const -{ - return send_queue.size (traffic_type::generic) >= 2 * queue_size; -} - -/** Call set_timeout with default_timeout as parameter */ -void nano::transport::tcp_socket::set_default_timeout () -{ - set_timeout (default_timeout); -} - -/** Set the current timeout of the socket in seconds - * timeout occurs when the last socket completion is more than timeout seconds in the past - * timeout always applies, the socket always has a timeout - * to set infinite timeout, use std::numeric_limits::max () - * the function checkup() checks for timeout on a regular interval - */ -void nano::transport::tcp_socket::set_timeout (std::chrono::seconds timeout_a) -{ - timeout = timeout_a.count (); -} - -void nano::transport::tcp_socket::set_last_completion () -{ - last_completion_time_or_init = nano::seconds_since_epoch (); -} - -void nano::transport::tcp_socket::set_last_receive_time () -{ - last_receive_time_or_init = nano::seconds_since_epoch (); -} - -void nano::transport::tcp_socket::ongoing_checkup () -{ - auto node_l = node_w.lock (); - if (!node_l) - { - return; - } - - node_l->workers.post_delayed (std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 5), [this_w = weak_from_this ()] () { - auto this_l = this_w.lock (); - if (!this_l) - { - return; - } - - auto node_l = this_l->node_w.lock (); - if (!node_l) - { - return; - } - - boost::asio::post (this_l->strand, [this_l] { - if (!this_l->raw_socket.is_open ()) + if (read_timestamp > 0 && timestamp - read_timestamp > io_threshold.count ()) { - this_l->close (); + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::timeout); + node.stats.inc (nano::stat::type::tcp_socket_timeout, nano::stat::detail::timeout_receive); + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in); + + timed_out = true; + return false; // Bad } - }); + if (write_timestamp > 0 && timestamp - write_timestamp > io_threshold.count ()) + { + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::timeout); + node.stats.inc (nano::stat::type::tcp_socket_timeout, nano::stat::detail::timeout_send); + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out); - nano::seconds_t now = nano::seconds_since_epoch (); - auto condition_to_disconnect{ false }; - - // if this is a server socket, and no data is received for silent_connection_tolerance_time seconds then disconnect - if (this_l->endpoint_type () == socket_endpoint::server && (now - this_l->last_receive_time_or_init) > static_cast (this_l->silent_connection_tolerance_time.count ())) - { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in); - - condition_to_disconnect = true; + timed_out = true; + return false; // Bad + } } - - // if there is no activity for timeout seconds then disconnect - if ((now - this_l->last_completion_time_or_init) > this_l->timeout) + if (silence_threshold.count () > 0) { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, this_l->endpoint_type () == socket_endpoint::server ? nano::stat::dir::in : nano::stat::dir::out); + if ((timestamp - last_receive) > silence_threshold.count () || (timestamp - last_send) > silence_threshold.count ()) + { + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::timeout); + node.stats.inc (nano::stat::type::tcp_socket_timeout, nano::stat::detail::timeout_silence); + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in); - condition_to_disconnect = true; + timed_out = true; + return false; // Bad + } } + } + else // Not connected yet + { + auto const now = std::chrono::steady_clock::now (); + auto const cutoff = now - node.config.tcp.connect_timeout; - if (condition_to_disconnect) + if (time_created < cutoff) { - // TODO: Stats - node_l->logger.debug (nano::log::type::tcp_socket, "Socket timeout, closing: {}", this_l->remote); - this_l->timed_out = true; - this_l->close (); - } - else if (!this_l->closed) - { - this_l->ongoing_checkup (); + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::timeout); + node.stats.inc (nano::stat::type::tcp_socket_timeout, nano::stat::detail::timeout_connect); + + timed_out = true; + return false; // Bad } + } + + return true; // Healthy +} + +auto nano::transport::tcp_socket::co_connect (nano::endpoint endpoint) -> asio::awaitable> +{ + // Dispatch operation to the strand + // TODO: This additional dispatch should not be necessary, but it is done during transition to coroutine based code + co_return co_await asio::co_spawn (strand, co_connect_impl (endpoint), asio::use_awaitable); +} + +// TODO: This is only used in tests, remove it, this creates untracked socket +auto nano::transport::tcp_socket::co_connect_impl (nano::endpoint endpoint) -> asio::awaitable> +{ + debug_assert (strand.running_in_this_thread ()); + debug_assert (endpoint_type == socket_endpoint::client); + debug_assert (!raw_socket.is_open ()); + debug_assert (connect_in_progress.exchange (true) == false); + + auto result = co_await raw_socket.async_connect (endpoint, asio::as_tuple (asio::use_awaitable)); + auto const & [ec] = result; + if (!ec) + { + // Best effort to get the endpoints + boost::system::error_code ec_ignored; + local_endpoint = raw_socket.local_endpoint (ec_ignored); + remote_endpoint = raw_socket.remote_endpoint (ec_ignored); + + connected = true; // Mark as connected + time_connected = std::chrono::steady_clock::now (); + + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_success); + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::connect_success); + node.logger.debug (nano::log::type::tcp_socket, "Successfully connected to: {} from local: {}", + remote_endpoint, local_endpoint); + } + else + { + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_connect_error); + node.stats.inc (nano::stat::type::tcp_socket, nano::stat::detail::connect_error); + node.logger.debug (nano::log::type::tcp_socket, "Failed to connect to: {} ({})", + endpoint, local_endpoint, ec); + + error = true; + close_impl (); + } + debug_assert (connect_in_progress.exchange (false) == true); + co_return result; +} + +void nano::transport::tcp_socket::async_connect (nano::endpoint endpoint, std::function callback) +{ + debug_assert (callback); + asio::co_spawn (strand, co_connect_impl (endpoint), [callback, /* lifetime guard */ this_s = shared_from_this ()] (std::exception_ptr const & ex, auto const & result) { + release_assert (!ex); + auto const & [ec] = result; + callback (ec); }); } -void nano::transport::tcp_socket::read_impl (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a) +boost::system::error_code nano::transport::tcp_socket::blocking_connect (nano::endpoint endpoint) { - auto node_l = node_w.lock (); - if (!node_l) - { - return; - } + auto fut = asio::co_spawn (strand, co_connect_impl (endpoint), asio::use_future); + fut.wait (); // Blocking call + auto result = fut.get (); + auto const & [ec] = result; + return ec; +} - // Increase timeout to receive TCP header (idle server socket) - auto const prev_timeout = get_default_timeout_value (); - set_default_timeout_value (node_l->network_params.network.idle_timeout); - async_read (data_a, size_a, [callback_l = std::move (callback_a), prev_timeout, this_l = shared_from_this ()] (boost::system::error_code const & ec_a, std::size_t size_a) { - this_l->set_default_timeout_value (prev_timeout); - callback_l (ec_a, size_a); +auto nano::transport::tcp_socket::co_read (nano::shared_buffer buffer, size_t target_size) -> asio::awaitable> +{ + // Dispatch operation to the strand + // TODO: This additional dispatch should not be necessary, but it is done during transition to coroutine based code + co_return co_await asio::co_spawn (strand, co_read_impl (buffer, target_size), asio::use_awaitable); +} + +auto nano::transport::tcp_socket::co_read_impl (nano::shared_buffer buffer, size_t target_size) -> asio::awaitable> +{ + debug_assert (strand.running_in_this_thread ()); + debug_assert (read_in_progress.exchange (true) == false); + release_assert (target_size <= buffer->size (), "read buffer size mismatch"); + + read_timestamp = timestamp; + auto result = co_await asio::async_read (raw_socket, asio::buffer (buffer->data (), target_size), asio::as_tuple (asio::use_awaitable)); + auto const & [ec, size_read] = result; + read_timestamp = 0; + if (!ec) + { + last_receive = timestamp; + node.stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::in, size_read); + } + else + { + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_read_error); + node.logger.debug (nano::log::type::tcp_socket, "Error reading from: {} ({})", remote_endpoint, ec); + + error = true; + close_impl (); + } + debug_assert (read_in_progress.exchange (false) == true); + co_return result; +} + +void nano::transport::tcp_socket::async_read (nano::shared_buffer buffer, size_t size, std::function callback) +{ + debug_assert (callback); + asio::co_spawn (strand, co_read_impl (buffer, size), [callback, /* lifetime guard */ this_s = shared_from_this ()] (std::exception_ptr const & ex, auto const & result) { + release_assert (!ex); + auto const & [ec, size] = result; + callback (ec, size); }); } +auto nano::transport::tcp_socket::blocking_read (nano::shared_buffer buffer, size_t size) -> std::tuple +{ + auto fut = asio::co_spawn (strand, co_read_impl (buffer, size), asio::use_future); + fut.wait (); // Blocking call + auto result = fut.get (); + return result; +} + +auto nano::transport::tcp_socket::co_write (nano::shared_buffer buffer, size_t target_size) -> asio::awaitable> +{ + // Dispatch operation to the strand + // TODO: This additional dispatch should not be necessary, but it is done during transition to coroutine based code + co_return co_await asio::co_spawn (strand, co_write_impl (buffer, target_size), asio::use_awaitable); +} + +auto nano::transport::tcp_socket::co_write_impl (nano::shared_buffer buffer, size_t target_size) -> asio::awaitable> +{ + debug_assert (strand.running_in_this_thread ()); + debug_assert (write_in_progress.exchange (true) == false); + release_assert (target_size <= buffer->size (), "write buffer size mismatch"); + + write_timestamp = timestamp; + auto result = co_await asio::async_write (raw_socket, asio::buffer (buffer->data (), target_size), asio::as_tuple (asio::use_awaitable)); + auto const & [ec, size_written] = result; + write_timestamp = 0; + if (!ec) + { + last_send = timestamp; + node.stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::out, size_written); + } + else + { + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_error); + node.logger.debug (nano::log::type::tcp_socket, "Error writing to: {} ({})", remote_endpoint, ec); + + error = true; + close_impl (); + } + debug_assert (write_in_progress.exchange (false) == true); + co_return result; +} + +void nano::transport::tcp_socket::async_write (nano::shared_buffer buffer, std::function callback) +{ + debug_assert (callback); + asio::co_spawn (strand, co_write_impl (buffer, buffer->size ()), [callback, /* lifetime guard */ this_s = shared_from_this ()] (std::exception_ptr const & ex, auto const & result) { + release_assert (!ex); + auto const & [ec, size] = result; + callback (ec, size); + }); +} + +auto nano::transport::tcp_socket::blocking_write (nano::shared_buffer buffer, size_t size) -> std::tuple +{ + auto fut = asio::co_spawn (strand, co_write_impl (buffer, size), asio::use_future); + fut.wait (); // Blocking call + auto result = fut.get (); + return result; +} + +nano::endpoint nano::transport::tcp_socket::get_remote_endpoint () const +{ + // Using cached value to avoid calling tcp_socket.remote_endpoint() which may be invalid (throw) after closing the socket + return remote_endpoint; +} + +nano::endpoint nano::transport::tcp_socket::get_local_endpoint () const +{ + // Using cached value to avoid calling tcp_socket.local_endpoint() which may be invalid (throw) after closing the socket + return local_endpoint; +} + +nano::transport::socket_endpoint nano::transport::tcp_socket::get_endpoint_type () const +{ + return endpoint_type; +} + +bool nano::transport::tcp_socket::alive () const +{ + return !closed; +} + +bool nano::transport::tcp_socket::has_connected () const +{ + return connected; +} + bool nano::transport::tcp_socket::has_timed_out () const { return timed_out; } -void nano::transport::tcp_socket::set_default_timeout_value (std::chrono::seconds timeout_a) +std::chrono::steady_clock::time_point nano::transport::tcp_socket::get_time_created () const { - default_timeout = timeout_a; + return time_created; } -std::chrono::seconds nano::transport::tcp_socket::get_default_timeout_value () const +std::chrono::steady_clock::time_point nano::transport::tcp_socket::get_time_connected () const { - return default_timeout; -} - -void nano::transport::tcp_socket::close () -{ - boost::asio::dispatch (strand, [this_l = shared_from_this ()] { - this_l->close_internal (); - }); -} - -// This must be called from a strand or the destructor -void nano::transport::tcp_socket::close_internal () -{ - auto node_l = node_w.lock (); - if (!node_l) - { - return; - } - - if (closed.exchange (true)) - { - return; - } - - send_queue.clear (); - - default_timeout = std::chrono::seconds (0); - - // Ignore error code for shutdown as it is best-effort - boost::system::error_code ec; - raw_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); - raw_socket.close (ec); - - if (ec) - { - node_l->stats.inc (nano::stat::type::socket, nano::stat::detail::error_socket_close); - node_l->logger.error (nano::log::type::tcp_socket, "Failed to close socket gracefully: {} ({})", - remote, - ec.message ()); - } - else - { - // TODO: Stats - node_l->logger.debug (nano::log::type::tcp_socket, "Closed socket: {}", remote); - } -} - -nano::tcp_endpoint nano::transport::tcp_socket::remote_endpoint () const -{ - // Using cached value to avoid calling tcp_socket.remote_endpoint() which may be invalid (throw) after closing the socket - return remote; -} - -nano::tcp_endpoint nano::transport::tcp_socket::local_endpoint () const -{ - // Using cached value to avoid calling tcp_socket.local_endpoint() which may be invalid (throw) after closing the socket - return local; + return time_connected; } void nano::transport::tcp_socket::operator() (nano::object_stream & obs) const { - obs.write ("remote_endpoint", remote_endpoint ()); - obs.write ("local_endpoint", local_endpoint ()); + obs.write ("remote_endpoint", remote_endpoint); + obs.write ("local_endpoint", local_endpoint); obs.write ("type", type_m.load ()); - obs.write ("endpoint_type", endpoint_type_m); -} - -/* - * socket_queue - */ - -nano::transport::socket_queue::socket_queue (std::size_t max_size_a) : - max_size{ max_size_a } -{ -} - -bool nano::transport::socket_queue::insert (const buffer_t & buffer, callback_t callback, nano::transport::traffic_type traffic_type) -{ - nano::lock_guard guard{ mutex }; - if (queues[traffic_type].size () < 2 * max_size) - { - queues[traffic_type].push (entry{ buffer, callback }); - return true; // Queued - } - return false; // Not queued -} - -auto nano::transport::socket_queue::pop () -> std::optional -{ - nano::lock_guard guard{ mutex }; - - auto try_pop = [this] (nano::transport::traffic_type type) -> std::optional { - auto & que = queues[type]; - if (!que.empty ()) - { - auto item = que.front (); - que.pop (); - return std::make_pair (item, type); - } - return std::nullopt; - }; - - // TODO: This is a very basic prioritization, implement something more advanced and configurable - if (auto item = try_pop (nano::transport::traffic_type::generic)) - { - return item; - } - - return std::nullopt; -} - -void nano::transport::socket_queue::clear () -{ - nano::lock_guard guard{ mutex }; - queues.clear (); -} - -std::size_t nano::transport::socket_queue::size (nano::transport::traffic_type traffic_type) const -{ - nano::lock_guard guard{ mutex }; - if (auto it = queues.find (traffic_type); it != queues.end ()) - { - return it->second.size (); - } - return 0; -} - -bool nano::transport::socket_queue::empty () const -{ - nano::lock_guard guard{ mutex }; - return std::all_of (queues.begin (), queues.end (), [] (auto const & que) { - return que.second.empty (); - }); + obs.write ("endpoint_type", endpoint_type); } /* diff --git a/nano/node/transport/tcp_socket.hpp b/nano/node/transport/tcp_socket.hpp index ebb7fe78..f87e6552 100644 --- a/nano/node/transport/tcp_socket.hpp +++ b/nano/node/transport/tcp_socket.hpp @@ -1,117 +1,65 @@ #pragma once -#include -#include #include -#include +#include #include -#include +#include #include -#include +#include +#include #include -#include #include -#include -#include -#include -#include - -namespace boost::asio::ip -{ -class network_v6; -} - -namespace nano -{ -class node; -} namespace nano::transport { -class socket_queue final -{ -public: - using buffer_t = nano::shared_const_buffer; - using callback_t = std::function; - - struct entry - { - buffer_t buffer; - callback_t callback; - }; - -public: - using result_t = std::pair; - - explicit socket_queue (std::size_t max_size); - - bool insert (buffer_t const &, callback_t, nano::transport::traffic_type); - std::optional pop (); - void clear (); - std::size_t size (nano::transport::traffic_type) const; - bool empty () const; - - std::size_t const max_size; - -private: - mutable nano::mutex mutex; - std::unordered_map> queues; -}; - -/** Socket class for tcp clients and newly accepted connections */ class tcp_socket final : public std::enable_shared_from_this { - friend class tcp_server; - friend class tcp_channels; - friend class tcp_listener; - public: - static size_t constexpr default_queue_size = 16; + /** Construct a new (unconnected) socket */ + explicit tcp_socket (nano::node &, nano::transport::socket_endpoint = socket_endpoint::client); -public: - explicit tcp_socket (nano::node &, nano::transport::socket_endpoint = socket_endpoint::client, size_t queue_size = default_queue_size); - - // TODO: Accepting remote/local endpoints as a parameter is unnecessary, but is needed for now to keep compatibility with the legacy code - tcp_socket ( - nano::node &, - boost::asio::ip::tcp::socket, - boost::asio::ip::tcp::endpoint remote_endpoint, - boost::asio::ip::tcp::endpoint local_endpoint, - nano::transport::socket_endpoint = socket_endpoint::server, - size_t queue_size = default_queue_size); + /** Construct from an existing (connected) socket */ + tcp_socket (nano::node &, asio::ip::tcp::socket, nano::transport::socket_endpoint = socket_endpoint::server); ~tcp_socket (); - void start (); void close (); + void close_async (); - void async_connect ( - boost::asio::ip::tcp::endpoint const & endpoint, - std::function callback); + nano::endpoint get_remote_endpoint () const; + nano::endpoint get_local_endpoint () const; + nano::transport::socket_endpoint get_endpoint_type () const; - void async_read ( - std::shared_ptr> const & buffer, - std::size_t size, - std::function callback); + bool alive () const; - void async_write ( - nano::shared_const_buffer const &, - std::function callback = nullptr); - - boost::asio::ip::tcp::endpoint remote_endpoint () const; - boost::asio::ip::tcp::endpoint local_endpoint () const; - - /** Returns true if the socket has timed out */ + bool has_connected () const; bool has_timed_out () const; - /** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */ - void set_default_timeout_value (std::chrono::seconds); - std::chrono::seconds get_default_timeout_value () const; - void set_timeout (std::chrono::seconds); - bool max () const; - bool full () const; + std::chrono::steady_clock::time_point get_time_created () const; + std::chrono::steady_clock::time_point get_time_connected () const; +public: + asio::awaitable> co_connect (nano::endpoint endpoint); + asio::awaitable> co_read (nano::shared_buffer, size_t size); + asio::awaitable> co_write (nano::shared_buffer, size_t size); + + // Adapters for callback style code + void async_connect (nano::endpoint endpoint, std::function callback); + void async_read (nano::shared_buffer, size_t size, std::function callback = nullptr); + void async_write (nano::shared_buffer, std::function callback = nullptr); + + // Adapters for sync style code + boost::system::error_code blocking_connect (nano::endpoint endpoint); + std::tuple blocking_read (nano::shared_buffer, size_t size); + std::tuple blocking_write (nano::shared_buffer, size_t size); + +private: + asio::awaitable> co_connect_impl (nano::endpoint endpoint); + asio::awaitable> co_read_impl (nano::shared_buffer, size_t size); + asio::awaitable> co_write_impl (nano::shared_buffer, size_t size); + +public: // TODO: Remove these nano::transport::socket_type type () const { return type_m; @@ -120,93 +68,51 @@ public: { type_m = type; } - nano::transport::socket_endpoint endpoint_type () const - { - return endpoint_type_m; - } - bool is_realtime_connection () const - { - return type () == socket_type::realtime; - } - bool is_bootstrap_connection () const - { - return type () == socket_type::bootstrap; - } - bool is_closed () const - { - return closed; - } - bool alive () const - { - return !is_closed (); - } - std::chrono::steady_clock::time_point get_time_connected () const - { - return time_connected.load (); - } private: - size_t const queue_size; - socket_queue send_queue; + void start (); + void stop (); -protected: - std::weak_ptr node_w; + void close_impl (); - boost::asio::strand strand; - boost::asio::ip::tcp::socket raw_socket; + asio::awaitable ongoing_checkup (); + bool checkup (); - /** The other end of the connection */ - boost::asio::ip::tcp::endpoint remote; - boost::asio::ip::tcp::endpoint local; +private: + nano::node & node; - /** number of seconds of inactivity that causes a socket timeout - * activity is any successful connect, send or receive event - */ - std::atomic timeout; + nano::async::strand strand; + nano::async::task task; + asio::ip::tcp::socket raw_socket; - /** the timestamp (in seconds since epoch) of the last time there was successful activity on the socket - * activity is any successful connect, send or receive event - */ - std::atomic last_completion_time_or_init; + nano::endpoint remote_endpoint; + nano::endpoint local_endpoint; + nano::transport::socket_endpoint const endpoint_type; - /** the timestamp (in seconds since epoch) of the last time there was successful receive on the socket - * successful receive includes graceful closing of the socket by the peer (the read succeeds but returns 0 bytes) - */ - std::atomic last_receive_time_or_init; - - /** Flag that is set when cleanup decides to close the socket due to timeout. - * NOTE: Currently used by tcp_server::timeout() but I suspect that this and tcp_server::timeout() are not needed. - */ + std::atomic connected{ false }; + std::atomic closed{ false }; + std::atomic error{ false }; std::atomic timed_out{ false }; - /** the timeout value to use when calling set_default_timeout() */ - std::atomic default_timeout; + std::chrono::steady_clock::time_point const time_created{ std::chrono::steady_clock::now () }; + std::atomic time_connected{}; - /** used in real time server sockets, number of seconds of no receive traffic that will cause the socket to timeout */ - std::chrono::seconds silent_connection_tolerance_time; - - /** Set by close() - completion handlers must check this. This is more reliable than checking - error codes as the OS may have already completed the async operation. */ - std::atomic closed{ false }; - - /** Updated only from strand, but stored as atomic so it can be read from outside */ + // Guard against conflicting concurrent async operations + std::atomic connect_in_progress{ false }; + std::atomic read_in_progress{ false }; std::atomic write_in_progress{ false }; - std::atomic time_connected; + std::atomic type_m{ nano::transport::socket_type::undefined }; - void close_internal (); - void write_queued_messages (); - void set_default_timeout (); - void set_last_completion (); - void set_last_receive_time (); - void ongoing_checkup (); - void read_impl (std::shared_ptr> const & data_a, std::size_t size_a, std::function callback_a); - -private: - socket_endpoint const endpoint_type_m; - std::atomic type_m{ socket_type::undefined }; +private: // Accessed only from strand + // Using a low-resolution clock to track timeouts to avoid system clock overhead + uint64_t timestamp{ 1 }; + uint64_t read_timestamp{ 0 }; + uint64_t write_timestamp{ 0 }; + uint64_t last_send{ 0 }; + uint64_t last_receive{ 0 }; public: // Logging - virtual void operator() (nano::object_stream &) const; + void operator() (nano::object_stream &) const; }; } diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 2b7d8363..d3ec24be 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -166,6 +166,14 @@ bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool return result; } +void nano::transport::throw_if_error (boost::system::error_code const & ec) +{ + if (ec) + { + throw boost::system::system_error (ec); + } +} + nano::stat::detail nano::to_stat_detail (boost::system::error_code const & ec) { switch (ec.value ()) diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 0d87402b..50c173d6 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -34,6 +34,8 @@ namespace socket_functions boost::asio::ip::address last_ipv6_subnet_address (boost::asio::ip::address_v6 const &, std::size_t); std::size_t count_subnetwork_connections (nano::transport::address_socket_mmap const &, boost::asio::ip::address_v6 const &, std::size_t); } + +void throw_if_error (boost::system::error_code const & ec); } namespace nano From e104e5995b6567967bcc027e2a9e75109fe07e5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 22 May 2025 19:11:56 +0200 Subject: [PATCH 2/4] Fix tests --- nano/core_test/socket.cpp | 135 ++++++++++++++++------------------- nano/core_test/telemetry.cpp | 11 +-- 2 files changed, 65 insertions(+), 81 deletions(-) diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index 339e6ab9..60d3cfc9 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -114,10 +114,11 @@ TEST (socket, disconnection_of_silent_connections) // Just to ensure the disconnection wasn't due to the timer timeout. ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in)); // Asserts the silent checker worked. - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in)); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in)); } -TEST (socket, drop_policy) +// FIXME: Socket no longer queues writes, so this test is no longer valid +TEST (socket, DISABLED_drop_policy) { nano::test::system system; @@ -180,9 +181,8 @@ TEST (socket, drop_policy) ASSERT_EQ (0, failed_writes); } -// This is abusing the socket class, it's interfering with the normal node lifetimes and as a result deadlocks -// TEST (socket, DISABLED_concurrent_writes) -TEST (socket, concurrent_writes) +// FIXME: Socket no longer queues writes, so this test is no longer valid +TEST (socket, DISABLED_concurrent_writes) { nano::test::system system; @@ -369,16 +369,14 @@ TEST (socket_timeout, read) // check that the callback was called and we got an error ASSERT_TIMELY_EQ (10s, done, true); ASSERT_TRUE (ec); - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in)); + ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error)); // check that the socket was closed due to tcp_io_timeout timeout - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); + ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in)); } TEST (socket_timeout, write) { - std::atomic done = false; - // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; @@ -386,13 +384,13 @@ TEST (socket_timeout, write) // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); - boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx); + boost::asio::ip::tcp::acceptor acceptor (node->io_ctx); acceptor.open (endpoint.protocol ()); acceptor.bind (endpoint); acceptor.listen (boost::asio::socket_base::max_listen_connections); // asynchronously accept an incoming connection and create a newsock and do not receive any data - boost::asio::ip::tcp::socket newsock (*system.io_ctx); + boost::asio::ip::tcp::socket newsock (node->io_ctx); acceptor.async_accept (newsock, [] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); }); @@ -402,24 +400,26 @@ TEST (socket_timeout, write) // and the timeout should kick in and close the socket, which will cause the async_write to return an error auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client); // socket with a max queue size much larger than OS buffers - socket->async_connect (acceptor.local_endpoint (), [&socket, &done] (boost::system::error_code const & ec_a) { - EXPECT_FALSE (ec_a); + auto ec_0 = socket->blocking_connect (acceptor.local_endpoint ()); + ASSERT_FALSE (ec_0); - auto buffer = std::make_shared> (128 * 1024); - for (auto i = 0; i < 1024; ++i) + auto buffer = std::make_shared> (128 * 1024); + + // At some point the write should fail with a timeout + boost::system::error_code ec; + for (auto i = 0; i < 1024; ++i) + { + auto [ec_1, size_1] = socket->blocking_write (buffer, buffer->size ()); + if (ec_1) { - socket->async_write (nano::shared_const_buffer{ buffer }, [&done] (boost::system::error_code const & ec_a, size_t size_a) { - if (ec_a) - { - done = true; - } - }); + ec = ec_1; + break; } - }); + } // check that the callback was called and we got an error - ASSERT_TIMELY (10s, done); - ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in)); + ASSERT_TRUE (ec); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error)); // check that the socket was closed due to tcp_io_timeout timeout ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); @@ -434,60 +434,48 @@ TEST (socket_timeout, read_overlapped) // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); - boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx); + boost::asio::ip::tcp::acceptor acceptor (node->io_ctx); acceptor.open (endpoint.protocol ()); acceptor.bind (endpoint); acceptor.listen (boost::asio::socket_base::max_listen_connections); // asynchronously accept an incoming connection and send one byte only - boost::asio::ip::tcp::socket newsock (*system.io_ctx); + boost::asio::ip::tcp::socket newsock (node->io_ctx); acceptor.async_accept (newsock, [&newsock] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); auto buffer = std::make_shared> (1); nano::async_write (newsock, nano::shared_const_buffer (buffer), [] (boost::system::error_code const & ec_a, size_t size_a) { EXPECT_TRUE (!ec_a); - EXPECT_TRUE (size_a == 1); + EXPECT_EQ (size_a, 1); }); }); // create a client socket to connect and call async_read twice, the second call should time out auto socket = std::make_shared (*node); - std::atomic done = false; - boost::system::error_code ec; - socket->async_connect (acceptor.local_endpoint (), [&socket, &ec, &done] (boost::system::error_code const & ec_a) { - EXPECT_FALSE (ec_a); - auto buffer = std::make_shared> (1); + auto ec_0 = socket->blocking_connect (acceptor.local_endpoint ()); + ASSERT_FALSE (ec_0); - socket->async_read (buffer, 1, [] (boost::system::error_code const & ec_a, size_t size_a) { - EXPECT_FALSE (ec_a); - EXPECT_TRUE (size_a == 1); - }); + auto buffer = std::make_shared> (1); - socket->async_read (buffer, 1, [&ec, &done] (boost::system::error_code const & ec_a, size_t size_a) { - EXPECT_EQ (size_a, 0); - if (ec_a) - { - ec = ec_a; - done = true; - } - }); - }); + auto [ec_1, size_1] = socket->blocking_read (buffer, 1); + ASSERT_FALSE (ec_1); + ASSERT_TRUE (size_1 == 1); + + auto [ec_2, size_2] = socket->blocking_read (buffer, 1); + ASSERT_EQ (size_2, 0); + ASSERT_TRUE (ec_2); // check that the callback was called and we got an error - ASSERT_TIMELY_EQ (10s, done, true); - ASSERT_TRUE (ec); - ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error, nano::stat::dir::in)); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_read_error)); // check that the socket was closed due to tcp_io_timeout timeout - ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in)); } TEST (socket_timeout, write_overlapped) { - std::atomic done = false; - // create one node and set timeout to 1 second nano::test::system system (1); std::shared_ptr node = system.nodes[0]; @@ -495,13 +483,13 @@ TEST (socket_timeout, write_overlapped) // create a server socket boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v6::loopback (), system.get_available_port ()); - boost::asio::ip::tcp::acceptor acceptor (*system.io_ctx); + boost::asio::ip::tcp::acceptor acceptor (node->io_ctx); acceptor.open (endpoint.protocol ()); acceptor.bind (endpoint); acceptor.listen (boost::asio::socket_base::max_listen_connections); // asynchronously accept an incoming connection and read 2 bytes only - boost::asio::ip::tcp::socket newsock (*system.io_ctx); + boost::asio::ip::tcp::socket newsock (node->io_ctx); auto buffer = std::make_shared> (1); acceptor.async_accept (newsock, [&newsock, &buffer] (boost::system::error_code const & ec_a) { EXPECT_FALSE (ec_a); @@ -515,30 +503,33 @@ TEST (socket_timeout, write_overlapped) // create a client socket and send lots of data to fill the socket queue on the local and remote side // eventually, the all tcp queues should fill up and async_write will not be able to progress // and the timeout should kick in and close the socket, which will cause the async_write to return an error - auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client); // socket with a max queue size much larger than OS buffers - socket->async_connect (acceptor.local_endpoint (), [&socket, &done] (boost::system::error_code const & ec_a) { - EXPECT_FALSE (ec_a); + auto socket = std::make_shared (*node, nano::transport::socket_endpoint::client); - auto buffer1 = std::make_shared> (1); - auto buffer2 = std::make_shared> (128 * 1024); - socket->async_write (nano::shared_const_buffer{ buffer1 }, [] (boost::system::error_code const & ec_a, size_t size_a) { - EXPECT_FALSE (ec_a); - EXPECT_EQ (size_a, 1); - }); - for (auto i = 0; i < 1024; ++i) + auto ec_0 = socket->blocking_connect (acceptor.local_endpoint ()); + ASSERT_FALSE (ec_0); + + auto buffer1 = std::make_shared> (1); + auto buffer2 = std::make_shared> (128 * 1024); + + auto [ec_1, size_1] = socket->blocking_write (nano::shared_const_buffer{ buffer1 }, 1); + ASSERT_FALSE (ec_1); + ASSERT_EQ (size_1, 1); + + // At some point the write should fail with a timeout + boost::system::error_code ec; + for (auto i = 0; i < 1000; ++i) + { + auto [ec_2, size_2] = socket->blocking_write (nano::shared_const_buffer{ buffer2 }, 128 * 1024); + if (ec_2) { - socket->async_write (nano::shared_const_buffer{ buffer2 }, [&done] (boost::system::error_code const & ec_a, size_t size_a) { - if (ec_a) - { - done = true; - } - }); + ec = ec_2; + break; } - }); + } // check that the callback was called and we got an error - ASSERT_TIMELY_EQ (10s, done, true); - ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error, nano::stat::dir::in)); + ASSERT_TRUE (ec); + ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_error)); // check that the socket was closed due to tcp_io_timeout timeout ASSERT_LE (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::out)); diff --git a/nano/core_test/telemetry.cpp b/nano/core_test/telemetry.cpp index aa213e77..9c311879 100644 --- a/nano/core_test/telemetry.cpp +++ b/nano/core_test/telemetry.cpp @@ -159,15 +159,8 @@ TEST (telemetry, dos_tcp) ASSERT_TIMELY (5s, (nano::dev::network_params.network.telemetry_request_cooldown + orig) <= std::chrono::steady_clock::now ()); - // Should process no more telemetry_req messages - ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); - - // Now spam messages waiting for it to be processed - while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1) - { - channel->send (message, nano::transport::traffic_type::test); - ASSERT_NO_ERROR (system.poll ()); - } + // But not respond to all of them (by default there are 2 broadcasts per second in dev mode) + ASSERT_ALWAYS (1s, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::out) < 7); } TEST (telemetry, disable_metrics) From 6108e4a917f4e55b213bc022bf905b30db4e9e7b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 22 Nov 2024 19:40:39 +0100 Subject: [PATCH 3/4] Coroutine server --- nano/core_test/network.cpp | 82 +--- nano/core_test/telemetry.cpp | 22 +- nano/core_test/websocket.cpp | 2 - nano/lib/asio.hpp | 4 +- nano/lib/stats_enums.hpp | 15 +- nano/node/message_processor.cpp | 1 + nano/node/messages.cpp | 189 ++++++++ nano/node/messages.hpp | 37 ++ nano/node/transport/channel.cpp | 14 + nano/node/transport/channel.hpp | 10 +- nano/node/transport/fwd.hpp | 1 + nano/node/transport/tcp_channel.cpp | 9 +- nano/node/transport/tcp_channel.hpp | 3 +- nano/node/transport/tcp_channels.cpp | 6 +- nano/node/transport/tcp_listener.cpp | 12 +- nano/node/transport/tcp_listener.hpp | 3 - nano/node/transport/tcp_server.cpp | 677 +++++++++++---------------- nano/node/transport/tcp_server.hpp | 111 ++--- nano/node/transport/tcp_socket.cpp | 7 +- nano/node/transport/tcp_socket.hpp | 2 +- nano/slow_test/node.cpp | 6 - 21 files changed, 635 insertions(+), 578 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index e584c326..1788cd80 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -96,8 +96,8 @@ TEST (network, send_node_id_handshake_tcp) node0->network.tcp_channels.start_tcp (node1->network.endpoint ()); ASSERT_EQ (0, node0->network.size ()); ASSERT_EQ (0, node1->network.size ()); - ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial + 2); - ASSERT_TIMELY (5s, node1->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial_node1 + 2); + ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::tcp_server_message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial + 2); + ASSERT_TIMELY (5s, node1->stats.count (nano::stat::type::tcp_server_message, nano::stat::detail::node_id_handshake, nano::stat::dir::in) >= initial_node1 + 1); ASSERT_TIMELY (5s, node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) >= initial_keepalive + 2); ASSERT_TIMELY (5s, node1->stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) >= initial_keepalive + 2); ASSERT_EQ (1, node0->network.size ()); @@ -293,66 +293,24 @@ TEST (network, send_insufficient_work) nano::test::system system (2); auto & node1 = *system.nodes[0]; auto & node2 = *system.nodes[1]; - // Block zero work + nano::block_builder builder; - auto block1 = builder - .send () - .previous (0) - .destination (1) - .balance (20) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (0) - .build (); - nano::publish publish1{ nano::dev::network_params.network, block1 }; - auto tcp_channel (node1.network.tcp_channels.find_node_id (node2.get_node_id ())); + auto block = builder + .send () + .previous (0) + .destination (1) + .balance (20) + .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) + .work (0) + .build (); + + auto tcp_channel = node1.network.tcp_channels.find_node_id (node2.get_node_id ()); ASSERT_NE (nullptr, tcp_channel); - tcp_channel->send (publish1, nano::transport::traffic_type::test); - ASSERT_EQ (0, node1.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); - ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 0); - ASSERT_EQ (1, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); - // Legacy block work between epoch_2_recieve & epoch_1 - auto block2 = builder - .send () - .previous (block1->hash ()) - .destination (1) - .balance (20) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1)) - .build (); - nano::publish publish2{ nano::dev::network_params.network, block2 }; - tcp_channel->send (publish2, nano::transport::traffic_type::test); - ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work) != 1); - ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); - // Legacy block work epoch_1 - auto block3 = builder - .send () - .previous (block2->hash ()) - .destination (1) - .balance (20) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (*system.work.generate (block2->hash (), node1.network_params.work.epoch_2)) - .build (); - nano::publish publish3{ nano::dev::network_params.network, block3 }; - tcp_channel->send (publish3, nano::transport::traffic_type::test); - ASSERT_EQ (0, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); - ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0); - ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); - // State block work epoch_2_recieve - auto block4 = builder - .state () - .account (nano::dev::genesis_key.pub) - .previous (block1->hash ()) - .representative (nano::dev::genesis_key.pub) - .balance (20) - .link (1) - .sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub) - .work (system.work_generate_limited (block1->hash (), node1.network_params.work.epoch_2_receive, node1.network_params.work.epoch_1 - 1)) - .build (); - nano::publish publish4{ nano::dev::network_params.network, block4 }; - tcp_channel->send (publish4, nano::transport::traffic_type::test); - ASSERT_TIMELY (10s, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) != 0); - ASSERT_EQ (1, node2.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in)); - ASSERT_EQ (2, node2.stats.count (nano::stat::type::error, nano::stat::detail::insufficient_work)); + + nano::publish publish{ nano::dev::network_params.network, block }; + tcp_channel->send (publish, nano::transport::traffic_type::test); + + ASSERT_TIMELY_EQ (5s, 1, node2.stats.count (nano::stat::type::tcp_server_error, nano::stat::detail::insufficient_work)); } TEST (receivable_processor, confirm_insufficient_pos) @@ -964,7 +922,7 @@ TEST (network, filter_invalid_network_bytes) const_cast (keepalive.header.network) = nano::networks::invalid; channel->send (keepalive, nano::transport::traffic_type::test); - ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::invalid_network)); + ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::tcp_server_error, nano::stat::detail::invalid_network)); } // Ensure the network filters messages with the incorrect minimum version @@ -983,7 +941,7 @@ TEST (network, filter_invalid_version_using) const_cast (keepalive.header.version_using) = nano::dev::network_params.network.protocol_version_min - 1; channel->send (keepalive, nano::transport::traffic_type::test); - ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::error, nano::stat::detail::outdated_version)); + ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::tcp_server_error, nano::stat::detail::outdated_version)); } TEST (network, fill_keepalive_self) diff --git a/nano/core_test/telemetry.cpp b/nano/core_test/telemetry.cpp index 9c311879..65f1314b 100644 --- a/nano/core_test/telemetry.cpp +++ b/nano/core_test/telemetry.cpp @@ -62,8 +62,6 @@ TEST (telemetry, basic) auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); - nano::test::wait_peer_connections (system); - // Request telemetry metrics auto channel = node_client->network.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); @@ -116,7 +114,7 @@ TEST (telemetry, disconnected) nano::node_flags node_flags; auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); - nano::test::wait_peer_connections (system); + auto channel = node_client->network.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); @@ -138,18 +136,10 @@ TEST (telemetry, dos_tcp) auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); - nano::test::wait_peer_connections (system); - - nano::telemetry_req message{ nano::dev::network_params.network }; auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); - channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { - ASSERT_FALSE (ec); - }); - ASSERT_TIMELY_EQ (5s, 1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); - - auto orig = std::chrono::steady_clock::now (); + nano::telemetry_req message{ nano::dev::network_params.network }; for (int i = 0; i < 10; ++i) { channel->send (message, nano::transport::traffic_type::test, [] (boost::system::error_code const & ec, size_t size_a) { @@ -157,7 +147,8 @@ TEST (telemetry, dos_tcp) }); } - ASSERT_TIMELY (5s, (nano::dev::network_params.network.telemetry_request_cooldown + orig) <= std::chrono::steady_clock::now ()); + // Should process telemetry_req messages + ASSERT_TIMELY (5s, 1 < node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); // But not respond to all of them (by default there are 2 broadcasts per second in dev mode) ASSERT_ALWAYS (1s, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::out) < 7); @@ -171,8 +162,6 @@ TEST (telemetry, disable_metrics) node_flags.disable_providing_telemetry_metrics = true; auto node_server = system.add_node (node_flags); - nano::test::wait_peer_connections (system); - // Try and request metrics from a node which is turned off but a channel is not closed yet auto channel = node_client->network.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); @@ -203,7 +192,6 @@ TEST (telemetry, max_possible_size) data.unknown_data.resize (nano::message_header::telemetry_size_mask.to_ulong () - nano::telemetry_data::latest_size); nano::telemetry_ack message{ nano::dev::network_params.network, data }; - nano::test::wait_peer_connections (system); auto channel = node_client->network.tcp_channels.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); @@ -224,8 +212,6 @@ TEST (telemetry, maker_pruning) config.enable_voting = false; auto node_server = system.add_node (config, node_flags); - nano::test::wait_peer_connections (system); - // Request telemetry metrics auto channel = node_client->network.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 415d4147..35c2689f 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -1047,8 +1047,6 @@ TEST (websocket, telemetry) config.websocket_config.port = system.get_available_port (); auto node2 (system.add_node (config, node_flags)); - nano::test::wait_peer_connections (system); - std::atomic done{ false }; auto task = ([config = node1->config, &node1, &done] () { fake_websocket_client client (node1->websocket.server->listening_port ()); diff --git a/nano/lib/asio.hpp b/nano/lib/asio.hpp index 946a853d..234f3be6 100644 --- a/nano/lib/asio.hpp +++ b/nano/lib/asio.hpp @@ -4,11 +4,13 @@ #include #include +#include + namespace nano { +using buffer_view = std::span; using shared_buffer = std::shared_ptr>; -// TODO: Replace with just shared_buffer class shared_const_buffer { public: diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index cbb1b003..0148dce2 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -35,9 +35,12 @@ enum class type http_callbacks_ec, ipc, tcp, - tcp_server, tcp_socket, tcp_socket_timeout, + tcp_server, + tcp_server_message, + tcp_server_read, + tcp_server_error, tcp_channel, tcp_channel_queued, tcp_channel_send, @@ -183,6 +186,7 @@ enum class detail sent, reset, close, + read, // processing queue queue, @@ -239,9 +243,10 @@ enum class detail forced, election, - // message specific + // message types not_a_type, invalid, + header, keepalive, publish, confirm_req, @@ -410,6 +415,8 @@ enum class detail outdated, // tcp_server + read_header, + read_payload, handshake, handshake_abort, handshake_error, @@ -417,6 +424,10 @@ enum class detail handshake_initiate, handshake_response, handshake_response_invalid, + handshake_failed, + message_queued, + message_dropped, + message_ignored, // ipc invocations, diff --git a/nano/node/message_processor.cpp b/nano/node/message_processor.cpp index 309806c6..64fc39ce 100644 --- a/nano/node/message_processor.cpp +++ b/nano/node/message_processor.cpp @@ -185,6 +185,7 @@ public: nano::endpoint peering_endpoint{ channel->get_remote_endpoint ().address (), self_report.port () }; channel->set_peering_endpoint (peering_endpoint); } + channel->set_last_keepalive (message); } void publish (nano::publish const & message) override diff --git a/nano/node/messages.cpp b/nano/node/messages.cpp index d665ee3f..b41d10f5 100644 --- a/nano/node/messages.cpp +++ b/nano/node/messages.cpp @@ -1978,6 +1978,185 @@ void nano::asc_pull_ack::frontiers_payload::operator() (nano::object_stream & ob }); } +/* + * + */ + +auto nano::deserialize_message ( +nano::buffer_view buffer, +nano::message_header const & header, +nano::network_constants const & network_constants, +nano::network_filter * network_filter, +nano::block_uniquer * block_uniquer, +nano::vote_uniquer * vote_uniquer) +-> deserialize_message_result +{ + nano::bufferstream stream{ buffer.data (), buffer.size () }; + + switch (header.type) + { + case nano::message_type::keepalive: + { + bool error = false; + auto message = std::make_unique (error, stream, header); + if (!error && at_end (stream)) + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_keepalive_message }; + } + break; + case nano::message_type::publish: + { + nano::uint128_t digest{ 0 }; + if (network_filter) + { + if (network_filter->apply (buffer.data (), buffer.size (), &digest)) + { + return { nullptr, deserialize_message_status::duplicate_publish_message }; + } + } + + bool error = false; + auto message = std::make_unique (error, stream, header, digest, block_uniquer); + if (!error && at_end (stream) || !message->block) + { + if (!network_constants.work.validate_entry (*message->block)) + { + return { std::move (message), deserialize_message_status::success }; + } + else + { + return { nullptr, deserialize_message_status::insufficient_work }; + } + } + return { nullptr, deserialize_message_status::invalid_publish_message }; + } + break; + case nano::message_type::confirm_req: + { + bool error = false; + auto message = std::make_unique (error, stream, header); + if (!error && at_end (stream)) + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_confirm_req_message }; + } + break; + case nano::message_type::confirm_ack: + { + nano::uint128_t digest{ 0 }; + if (network_filter) + { + if (network_filter->apply (buffer.data (), buffer.size (), &digest)) + { + return { nullptr, deserialize_message_status::duplicate_confirm_ack_message }; + } + } + + bool error = false; + auto message = std::make_unique (error, stream, header, digest, vote_uniquer); + if (!error && at_end (stream)) + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_confirm_ack_message }; + } + break; + case nano::message_type::node_id_handshake: + { + bool error = false; + auto message = std::make_unique (error, stream, header); + if (!error && at_end (stream)) + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_node_id_handshake_message }; + } + break; + case nano::message_type::telemetry_req: + { + return { std::make_unique (header), deserialize_message_status::success }; + } + break; + case nano::message_type::telemetry_ack: + { + bool error = false; + auto message = std::make_unique (error, stream, header); + if (!error) // Intentionally not checking at_end here for forward compatibility + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_telemetry_ack_message }; + } + break; + case nano::message_type::bulk_pull: + { + bool error = false; + auto message = std::make_unique (error, stream, header); + if (!error && at_end (stream)) + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_bulk_pull_message }; + } + break; + case nano::message_type::bulk_pull_account: + { + bool error = false; + auto message = std::make_unique (error, stream, header); + if (!error && at_end (stream)) + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_bulk_pull_account_message }; + } + break; + case nano::message_type::bulk_push: + { + return { std::make_unique (header), deserialize_message_status::success }; + } + break; + case nano::message_type::frontier_req: + { + bool error = false; + auto message = std::make_unique (error, stream, header); + if (!error && at_end (stream)) + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_frontier_req_message }; + } + break; + case nano::message_type::asc_pull_req: + { + bool error = false; + auto message = std::make_unique (error, stream, header); + if (!error) + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_asc_pull_req_message }; + } + break; + case nano::message_type::asc_pull_ack: + { + bool error = false; + auto message = std::make_unique (error, stream, header); + if (!error) + { + return { std::move (message), deserialize_message_status::success }; + } + return { nullptr, deserialize_message_status::invalid_asc_pull_ack_message }; + } + break; + default: + return { nullptr, deserialize_message_status::invalid_message_type }; + } + release_assert (false, "invalid message type"); +} + /* * */ @@ -1996,3 +2175,13 @@ nano::log::detail nano::to_log_detail (nano::message_type type) { return nano::enum_util::cast (type); } + +nano::stat::detail nano::to_stat_detail (nano::deserialize_message_status status) +{ + return nano::enum_util::cast (status); +} + +std::string_view nano::to_string (nano::deserialize_message_status status) +{ + return nano::enum_util::name (status); +} \ No newline at end of file diff --git a/nano/node/messages.hpp b/nano/node/messages.hpp index 2a0e4d0a..fe616f12 100644 --- a/nano/node/messages.hpp +++ b/nano/node/messages.hpp @@ -810,4 +810,41 @@ public: } virtual void default_handler (nano::message const &){}; }; + +enum class deserialize_message_status +{ + success, + insufficient_work, + invalid_header, + invalid_message_type, + invalid_keepalive_message, + invalid_publish_message, + invalid_confirm_req_message, + invalid_confirm_ack_message, + invalid_node_id_handshake_message, + invalid_telemetry_req_message, + invalid_telemetry_ack_message, + invalid_bulk_pull_message, + invalid_bulk_pull_account_message, + invalid_frontier_req_message, + invalid_asc_pull_req_message, + invalid_asc_pull_ack_message, + invalid_network, + outdated_version, + duplicate_publish_message, + duplicate_confirm_ack_message, +}; + +nano::stat::detail to_stat_detail (deserialize_message_status); +std::string_view to_string (deserialize_message_status); + +using deserialize_message_result = std::tuple, nano::deserialize_message_status>; + +deserialize_message_result deserialize_message ( +nano::buffer_view buffer, +nano::message_header const & header, +nano::network_constants const &, +nano::network_filter * = nullptr, +nano::block_uniquer * = nullptr, +nano::vote_uniquer * = nullptr); } diff --git a/nano/node/transport/channel.cpp b/nano/node/transport/channel.cpp index ff80fdb7..3c286b09 100644 --- a/nano/node/transport/channel.cpp +++ b/nano/node/transport/channel.cpp @@ -39,6 +39,20 @@ nano::endpoint nano::transport::channel::get_peering_endpoint () const return get_remote_endpoint (); } +void nano::transport::channel::set_last_keepalive (nano::keepalive const & message) +{ + nano::lock_guard lock{ mutex }; + last_keepalive = message; +} + +std::optional nano::transport::channel::pop_last_keepalive () +{ + nano::lock_guard lock{ mutex }; + auto result = last_keepalive; + last_keepalive.reset (); + return result; +} + std::shared_ptr nano::transport::channel::owner () const { return node.shared (); diff --git a/nano/node/transport/channel.hpp b/nano/node/transport/channel.hpp index cafb830b..1bea5a8f 100644 --- a/nano/node/transport/channel.hpp +++ b/nano/node/transport/channel.hpp @@ -55,7 +55,6 @@ public: nano::lock_guard lock{ mutex }; return last_bootstrap_attempt; } - void set_last_bootstrap_attempt (std::chrono::steady_clock::time_point const time_a) { nano::lock_guard lock{ mutex }; @@ -67,7 +66,6 @@ public: nano::lock_guard lock{ mutex }; return last_packet_received; } - void set_last_packet_received (std::chrono::steady_clock::time_point const time_a) { nano::lock_guard lock{ mutex }; @@ -79,7 +77,6 @@ public: nano::lock_guard lock{ mutex }; return last_packet_sent; } - void set_last_packet_sent (std::chrono::steady_clock::time_point const time_a) { nano::lock_guard lock{ mutex }; @@ -91,13 +88,11 @@ public: nano::lock_guard lock{ mutex }; return node_id; } - nano::account get_node_id () const { nano::lock_guard lock{ mutex }; return node_id.value_or (0); } - void set_node_id (nano::account node_id_a) { nano::lock_guard lock{ mutex }; @@ -108,7 +103,6 @@ public: { return network_version; } - void set_network_version (uint8_t network_version_a) { network_version = network_version_a; @@ -117,6 +111,9 @@ public: nano::endpoint get_peering_endpoint () const; void set_peering_endpoint (nano::endpoint endpoint); + void set_last_keepalive (nano::keepalive const & message); + std::optional pop_last_keepalive (); + std::shared_ptr owner () const; protected: @@ -133,6 +130,7 @@ private: std::optional node_id{}; std::atomic network_version{ 0 }; std::optional peering_endpoint{}; + std::optional last_keepalive{}; public: // Logging virtual void operator() (nano::object_stream &) const; diff --git a/nano/node/transport/fwd.hpp b/nano/node/transport/fwd.hpp index b344faa5..be77d206 100644 --- a/nano/node/transport/fwd.hpp +++ b/nano/node/transport/fwd.hpp @@ -4,6 +4,7 @@ namespace nano::transport { class channel; class loopback_channel; +class message_deserializer; class tcp_channel; class tcp_channels; class tcp_server; diff --git a/nano/node/transport/tcp_channel.cpp b/nano/node/transport/tcp_channel.cpp index bf5db21f..4c9834ac 100644 --- a/nano/node/transport/tcp_channel.cpp +++ b/nano/node/transport/tcp_channel.cpp @@ -30,7 +30,11 @@ void nano::transport::tcp_channel::close () { stop (); socket->close (); - closed = true; +} + +void nano::transport::tcp_channel::close_async () +{ + socket->close_async (); } void nano::transport::tcp_channel::start () @@ -57,6 +61,9 @@ asio::awaitable nano::transport::tcp_channel::start_sending (nano::async:: release_assert (false, "unexpected exception"); } debug_assert (strand.running_in_this_thread ()); + + // Ensure socket gets closed if task is stopped + close_async (); } void nano::transport::tcp_channel::stop () diff --git a/nano/node/transport/tcp_channel.hpp b/nano/node/transport/tcp_channel.hpp index 78a75202..acc30ee9 100644 --- a/nano/node/transport/tcp_channel.hpp +++ b/nano/node/transport/tcp_channel.hpp @@ -49,6 +49,7 @@ public: ~tcp_channel () override; void close () override; + void close_async (); // Safe to call from io context bool max (nano::transport::traffic_type traffic_type) override; bool alive () const override; @@ -88,8 +89,6 @@ private: tcp_channel_queue queue; std::atomic allocated_bandwidth{ 0 }; - std::atomic closed{ false }; - public: // Logging void operator() (nano::object_stream &) const override; }; diff --git a/nano/node/transport/tcp_channels.cpp b/nano/node/transport/tcp_channels.cpp index 3af0ba26..6c680c81 100644 --- a/nano/node/transport/tcp_channels.cpp +++ b/nano/node/transport/tcp_channels.cpp @@ -39,7 +39,7 @@ void nano::transport::tcp_channels::close () for (auto const & entry : channels) { entry.socket->close (); - entry.server->stop (); + entry.server->close (); entry.channel->close (); } @@ -400,9 +400,9 @@ std::optional nano::transport::tcp_channels::sample_keepalive ( while (counter++ < channels.size ()) { auto index = rng.random (channels.size ()); - if (auto server = channels.get ()[index].server) + if (auto channel = channels.get ()[index].channel) { - if (auto keepalive = server->pop_last_keepalive ()) + if (auto keepalive = channel->pop_last_keepalive ()) { return keepalive; } diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 12994eb7..a2379ef5 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -154,7 +154,7 @@ void nano::transport::tcp_listener::stop () for (auto & connection : connections_l) { connection.socket->close (); - connection.server->stop (); + connection.server->close (); } logger.debug (nano::log::type::tcp_listener, "Stopped"); @@ -199,7 +199,7 @@ void nano::transport::tcp_listener::purge (nano::unique_lock & lock logger.debug (nano::log::type::tcp_listener, "Evicting dead connection: {}", connection.endpoint); connection.socket->close (); - connection.server->stop (); + connection.server->close (); } } @@ -306,9 +306,6 @@ auto nano::transport::tcp_listener::connect_impl (asio::ip::tcp::endpoint endpoi { stats.inc (nano::stat::type::tcp_listener, nano::stat::detail::connect_success, nano::stat::dir::out); logger.debug (nano::log::type::tcp_listener, "Successfully connected to: {}", endpoint); - - release_assert (result.server); - result.server->initiate_handshake (); } else { @@ -432,14 +429,13 @@ auto nano::transport::tcp_listener::accept_one (asio::ip::tcp::socket raw_socket logger.debug (nano::log::type::tcp_listener, "Accepted connection: {} ({})", remote_endpoint, to_string (type)); auto socket = std::make_shared (node, std::move (raw_socket), to_socket_endpoint (type)); - auto server = std::make_shared (socket, node.shared (), true); + auto server = std::make_shared (node, socket); + server->start (); connections.emplace_back (connection{ type, remote_endpoint, socket, server }); lock.unlock (); - server->start (); - connection_accepted.notify (socket, server); return { accept_result::accepted, socket, server }; diff --git a/nano/node/transport/tcp_listener.hpp b/nano/node/transport/tcp_listener.hpp index 85854a80..79ffe3a2 100644 --- a/nano/node/transport/tcp_listener.hpp +++ b/nano/node/transport/tcp_listener.hpp @@ -25,9 +25,6 @@ namespace asio = boost::asio; namespace nano::transport { -/** - * Server side portion of tcp sessions. Listens for new socket connections and spawns tcp_server objects when connected. - */ class tcp_listener final { public: diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 127f358a..ee93fb96 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -6,433 +6,406 @@ #include -/* - * tcp_server - */ - -nano::transport::tcp_server::tcp_server (std::shared_ptr socket_a, std::shared_ptr node_a, bool allow_bootstrap_a) : - socket{ socket_a }, +nano::transport::tcp_server::tcp_server (nano::node & node_a, std::shared_ptr socket_a) : node{ node_a }, - allow_bootstrap{ allow_bootstrap_a }, - message_deserializer{ - std::make_shared (node_a->network_params.network, node_a->network.filter, node_a->block_uniquer, node_a->vote_uniquer, - [socket_l = socket] (std::shared_ptr> const & data_a, size_t size_a, std::function callback_a) { - debug_assert (socket_l != nullptr); - socket_l->async_read (data_a, size_a, callback_a); - }) - } + socket{ socket_a }, + strand{ node_a.io_ctx.get_executor () }, + task{ strand }, + buffer{ std::make_shared (max_buffer_size) } { - debug_assert (socket != nullptr); } nano::transport::tcp_server::~tcp_server () { - auto node = this->node.lock (); - if (!node) - { - return; - } - - node->logger.debug (nano::log::type::tcp_server, "Exiting server: {}", remote_endpoint); - - stop (); + close (); } +void nano::transport::tcp_server::close () +{ + stop (); + socket->close (); +} + +void nano::transport::tcp_server::close_async () +{ + socket->close_async (); +} + +// Starting the server must be separate from the constructor to allow the socket to access shared_from_this void nano::transport::tcp_server::start () { - // Set remote_endpoint - if (remote_endpoint.port () == 0) - { - remote_endpoint = socket->get_remote_endpoint (); - debug_assert (remote_endpoint.port () != 0); - } - - auto node = this->node.lock (); - if (!node) - { - return; - } - - node->logger.debug (nano::log::type::tcp_server, "Starting server: {}", remote_endpoint); - - receive_message (); + task = nano::async::task (strand, start_impl ()); } void nano::transport::tcp_server::stop () { - if (!stopped.exchange (true)) + if (task.running ()) { - socket->close_async (); + // Node context must be running to gracefully stop async tasks + debug_assert (!node.io_ctx.stopped ()); + // Ensure that we are not trying to await the task while running on the same thread / io_context + debug_assert (!node.io_ctx.get_executor ().running_in_this_thread ()); + + task.cancel (); + task.join (); } } -void nano::transport::tcp_server::receive_message () +auto nano::transport::tcp_server::start_impl () -> asio::awaitable { - if (stopped) + debug_assert (strand.running_in_this_thread ()); + try { - return; - } - - message_deserializer->read ([this_l = shared_from_this ()] (boost::system::error_code ec, std::unique_ptr message) { - auto node = this_l->node.lock (); - if (!node) + auto handshake_result = co_await perform_handshake (); + if (handshake_result != process_result::progress) { - return; - } - if (ec) - { - // IO error or critical error when deserializing message - node->stats.inc (nano::stat::type::error, to_stat_detail (this_l->message_deserializer->status)); - node->logger.debug (nano::log::type::tcp_server, "Error reading message: {}, status: {} ({})", - ec.message (), - to_string (this_l->message_deserializer->status), - this_l->remote_endpoint); - - this_l->stop (); + node.logger.debug (nano::log::type::tcp_server, "Handshake aborted: {}", get_remote_endpoint ()); } else { - this_l->received_message (std::move (message)); + co_await run_realtime (); } - }); + } + catch (boost::system::system_error const & ex) + { + node.stats.inc (nano::stat::type::tcp_server_error, nano::to_stat_detail (ex.code ()), nano::stat::dir::in); + node.logger.debug (nano::log::type::tcp_server, "Server stopped due to error: {} ({})", ex.code (), get_remote_endpoint ()); + } + catch (...) + { + release_assert (false, "unexpected exception"); + } + debug_assert (strand.running_in_this_thread ()); + + // Ensure socket gets closed if task is stopped + close_async (); } -void nano::transport::tcp_server::received_message (std::unique_ptr message) +bool nano::transport::tcp_server::alive () const { - auto node = this->node.lock (); - if (!node) - { - return; - } - - process_result result = process_result::progress; - if (message) - { - result = process_message (std::move (message)); - } - else - { - // Error while deserializing message - debug_assert (message_deserializer->status != transport::parse_status::success); - - node->stats.inc (nano::stat::type::error, to_stat_detail (message_deserializer->status)); - - switch (message_deserializer->status) - { - // Avoid too much noise about `duplicate_publish_message` errors - case nano::transport::parse_status::duplicate_publish_message: - { - node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message); - } - break; - case nano::transport::parse_status::duplicate_confirm_ack_message: - { - node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message); - } - break; - default: - { - node->logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})", - to_string (message_deserializer->status), - remote_endpoint); - } - break; - } - } - - switch (result) - { - case process_result::progress: - { - receive_message (); - } - break; - case process_result::abort: - { - stop (); - } - break; - case process_result::pause: - { - // Do nothing - } - break; - } + return socket->alive (); } -auto nano::transport::tcp_server::process_message (std::unique_ptr message) -> process_result +auto nano::transport::tcp_server::perform_handshake () -> asio::awaitable { - auto node = this->node.lock (); - if (!node) + debug_assert (strand.running_in_this_thread ()); + debug_assert (get_type () == nano::transport::socket_type::undefined); + + // Initiate handshake if we are the ones initiating the connection + if (socket->get_endpoint_type () == nano::transport::socket_endpoint::client) { - return process_result::abort; + co_await send_handshake_request (); } - node->stats.inc (nano::stat::type::tcp_server, to_stat_detail (message->type ()), nano::stat::dir::in); - - debug_assert (is_undefined_connection () || is_realtime_connection () || is_bootstrap_connection ()); - - /* - * Server initially starts in undefined state, where it waits for either a handshake or booststrap request message - * If the server receives a handshake (and it is successfully validated) it will switch to a realtime mode. - * In realtime mode messages are deserialized and queued to `tcp_message_manager` for further processing. - * In realtime mode any bootstrap requests are ignored. - * - * If the server receives a bootstrap request before receiving a handshake, it will switch to a bootstrap mode. - * In bootstrap mode once a valid bootstrap request message is received, the server will start a corresponding bootstrap server and pass control to that server. - * Once that server finishes its task, control is passed back to this server to read and process any subsequent messages. - * In bootstrap mode any realtime messages are ignored - */ - if (is_undefined_connection ()) + struct handshake_message_visitor : public nano::message_visitor { - handshake_message_visitor handshake_visitor{ *this }; + bool process{ false }; + std::optional handshake; + + void node_id_handshake (nano::node_id_handshake const & msg) override + { + process = true; + handshake = msg; + } + }; + + // Two-step handshake + for (int i = 0; i < 2; ++i) + { + auto [message, message_status] = co_await receive_message (); + if (!message) + { + node.logger.debug (nano::log::type::tcp_server, "Error deserializing handshake message: {} ({})", + to_string (message_status), + get_remote_endpoint ()); + } + + handshake_message_visitor handshake_visitor{}; message->visit (handshake_visitor); - switch (handshake_visitor.result) + handshake_status status = handshake_status::abort; + if (handshake_visitor.process) + { + release_assert (handshake_visitor.handshake.has_value ()); + status = co_await process_handshake (*handshake_visitor.handshake); + } + switch (status) { case handshake_status::abort: + case handshake_status::bootstrap: // Legacy bootstrap is no longer supported { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort); - node->logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", to_string (message->type ()), remote_endpoint); + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_abort); + node.logger.debug (nano::log::type::tcp_server, "Aborting handshake: {} ({})", + to_string (message->type ()), + get_remote_endpoint ()); - return process_result::abort; - } - case handshake_status::handshake: - { - return process_result::progress; // Continue handshake + co_return process_result::abort; } case handshake_status::realtime: { - queue_realtime (std::move (message)); - return process_result::progress; // Continue receiving new messages + co_return process_result::progress; // Continue receiving new messages } - case handshake_status::bootstrap: + case handshake_status::handshake: { - bool success = to_bootstrap_connection (); - if (!success) - { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Error switching to bootstrap mode: {} ({})", to_string (message->type ()), remote_endpoint); - - return process_result::abort; // Switch failed, abort - } - else - { - // Fall through to process the bootstrap message - } + // Continue handshake } } } - else if (is_realtime_connection ()) - { - realtime_message_visitor realtime_visitor{ *this }; - message->visit (realtime_visitor); - if (realtime_visitor.process) + // Failed to complete handshake, abort + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_failed); + node.logger.debug (nano::log::type::tcp_server, "Failed to complete handshake ({})", get_remote_endpoint ()); + co_return process_result::abort; +} + +auto nano::transport::tcp_server::run_realtime () -> asio::awaitable +{ + debug_assert (strand.running_in_this_thread ()); + debug_assert (get_type () == nano::transport::socket_type::realtime); + + node.logger.debug (nano::log::type::tcp_server, "Running realtime connection: {}", get_remote_endpoint ()); + + while (!co_await nano::async::cancelled ()) + { + debug_assert (strand.running_in_this_thread ()); + + auto [message, status] = co_await receive_message (); + if (message) { - queue_realtime (std::move (message)); + realtime_message_visitor realtime_visitor{}; + message->visit (realtime_visitor); + + if (realtime_visitor.process) + { + release_assert (channel != nullptr); + channel->set_last_packet_received (std::chrono::steady_clock::now ()); + + // TODO: Throttle if not added + bool added = node.message_processor.put (std::move (message), channel); + node.stats.inc (nano::stat::type::tcp_server, added ? nano::stat::detail::message_queued : nano::stat::detail::message_dropped); + } + else + { + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::message_ignored); + } } + else // Error while deserializing message + { + debug_assert (status != nano::deserialize_message_status::success); - return process_result::progress; - } - // The server will switch to bootstrap mode immediately after processing the first bootstrap message, thus no `else if` - if (is_bootstrap_connection ()) - { - return process_result::abort; - } + node.stats.inc (nano::stat::type::tcp_server_error, to_stat_detail (status)); - debug_assert (false); - return process_result::abort; + switch (status) + { + // Avoid too much noise about `duplicate_publish_message` errors + case nano::deserialize_message_status::duplicate_publish_message: + { + node.stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish_message); + } + break; + case nano::deserialize_message_status::duplicate_confirm_ack_message: + { + node.stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_confirm_ack_message); + } + break; + default: + { + node.logger.debug (nano::log::type::tcp_server, "Error deserializing message: {} ({})", + to_string (status), + get_remote_endpoint ()); + + co_return; // Stop receiving further messages + } + break; + } + } + } } -void nano::transport::tcp_server::queue_realtime (std::unique_ptr message) +auto nano::transport::tcp_server::receive_message () -> asio::awaitable { - auto node = this->node.lock (); - if (!node) + debug_assert (strand.running_in_this_thread ()); + + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::read_header, nano::stat::dir::in); + node.stats.inc (nano::stat::type::tcp_server_read, nano::stat::detail::header, nano::stat::dir::in); + + auto header_payload = co_await read_socket (nano::message_header::size); + auto header_stream = nano::bufferstream{ header_payload.data (), header_payload.size () }; + + bool error = false; + nano::message_header header{ error, header_stream }; + + if (error) { - return; + co_return nano::deserialize_message_result{ nullptr, nano::deserialize_message_status::invalid_header }; + } + if (!header.is_valid_message_type ()) + { + co_return nano::deserialize_message_result{ nullptr, nano::deserialize_message_status::invalid_message_type }; + } + if (header.network != node.config.network_params.network.current_network) + { + co_return nano::deserialize_message_result{ nullptr, nano::deserialize_message_status::invalid_network }; + } + if (header.version_using < node.config.network_params.network.protocol_version_min) + { + co_return nano::deserialize_message_result{ nullptr, nano::deserialize_message_status::outdated_version }; } - release_assert (channel != nullptr); + auto const payload_size = header.payload_length_bytes (); - channel->set_last_packet_received (std::chrono::steady_clock::now ()); + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::read_payload, nano::stat::dir::in); + node.stats.inc (nano::stat::type::tcp_server_read, to_stat_detail (header.type), nano::stat::dir::in); - bool added = node->message_processor.put (std::move (message), channel); - // TODO: Throttle if not added + auto payload_buffer = payload_size > 0 ? co_await read_socket (payload_size) : nano::buffer_view{ buffer->data (), 0 }; + + auto result = nano::deserialize_message (payload_buffer, header, + node.network_params.network, + &node.network.filter, + &node.block_uniquer, + &node.vote_uniquer); + + auto const & [message, status] = result; + if (message) + { + node.stats.inc (nano::stat::type::tcp_server_message, to_stat_detail (message->type ()), nano::stat::dir::in); + } + + co_return result; } -auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> handshake_status +auto nano::transport::tcp_server::read_socket (size_t size) const -> asio::awaitable { - auto node = this->node.lock (); - if (!node) + debug_assert (strand.running_in_this_thread ()); + + auto [ec, size_read] = co_await socket->co_read (buffer, size); + debug_assert (ec || size_read == size); + debug_assert (strand.running_in_this_thread ()); + + if (ec) { - return handshake_status::abort; + throw boost::system::system_error (ec); } - if (node->flags.disable_tcp_realtime) - { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime mode ({})", remote_endpoint); + release_assert (size_read == size); + co_return nano::buffer_view{ buffer->data (), size_read }; +} - return handshake_status::abort; +auto nano::transport::tcp_server::process_handshake (nano::node_id_handshake const & message) -> asio::awaitable +{ + if (node.flags.disable_tcp_realtime) + { + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node.logger.debug (nano::log::type::tcp_server, "Handshake attempted with disabled realtime mode ({})", get_remote_endpoint ()); + + co_return handshake_status::abort; } if (!message.query && !message.response) { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", remote_endpoint); + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node.logger.debug (nano::log::type::tcp_server, "Invalid handshake message received ({})", get_remote_endpoint ()); - return handshake_status::abort; + co_return handshake_status::abort; } if (message.query && handshake_received) // Second handshake message should be a response only { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", remote_endpoint); + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node.logger.debug (nano::log::type::tcp_server, "Detected multiple handshake queries ({})", get_remote_endpoint ()); - return handshake_status::abort; + co_return handshake_status::abort; } handshake_received = true; - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - node->logger.debug (nano::log::type::tcp_server, "Handshake message received: {} ({})", + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::node_id_handshake, nano::stat::dir::in); + node.logger.debug (nano::log::type::tcp_server, "Handshake message received: {} ({})", message.query ? (message.response ? "query + response" : "query") : (message.response ? "response" : "none"), - remote_endpoint); + get_remote_endpoint ()); if (message.query) { // Sends response + our own query - send_handshake_response (*message.query, message.is_v2 ()); + co_await send_handshake_response (*message.query, message.is_v2 ()); // Fall through and continue handshake } if (message.response) { - if (node->network.verify_handshake_response (*message.response, nano::transport::map_tcp_to_endpoint (remote_endpoint))) + if (node.network.verify_handshake_response (*message.response, get_remote_endpoint ())) { bool success = to_realtime_connection (message.response->node_id); if (success) { - return handshake_status::realtime; // Switched to realtime + co_return handshake_status::realtime; // Switched to realtime } else { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); - node->logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", remote_endpoint); + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_error); + node.logger.debug (nano::log::type::tcp_server, "Error switching to realtime mode ({})", get_remote_endpoint ()); - return handshake_status::abort; + co_return handshake_status::abort; } } else { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid); - node->logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", remote_endpoint); + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response_invalid); + node.logger.debug (nano::log::type::tcp_server, "Invalid handshake response received ({})", get_remote_endpoint ()); - return handshake_status::abort; + co_return handshake_status::abort; } } - return handshake_status::handshake; // Handshake is in progress + co_return handshake_status::handshake; // Handshake is in progress } -void nano::transport::tcp_server::initiate_handshake () +auto nano::transport::tcp_server::send_handshake_request () -> asio::awaitable { - auto node = this->node.lock (); - if (!node) - { - return; - } + auto query = node.network.prepare_handshake_query (get_remote_endpoint ()); + nano::node_id_handshake message{ node.network_params.network, query }; - auto query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); - nano::node_id_handshake message{ node->network_params.network, query }; - - node->logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", remote_endpoint); + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_initiate, nano::stat::dir::out); + node.logger.debug (nano::log::type::tcp_server, "Initiating handshake query ({})", get_remote_endpoint ()); auto shared_const_buffer = message.to_shared_const_buffer (); - socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { - auto node = this_l->node.lock (); - if (!node) - { - return; - } - if (ec) - { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error); - node->logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})", ec.message (), this_l->remote_endpoint); - // Stop invalid handshake - this_l->stop (); - } - else - { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out); - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_initiate, nano::stat::dir::out); - } - }); + auto [ec, size] = co_await socket->co_write (shared_const_buffer, shared_const_buffer.size ()); + debug_assert (ec || size == shared_const_buffer.size ()); + if (ec) + { + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error); + node.logger.debug (nano::log::type::tcp_server, "Error sending handshake query: {} ({})", ec.message (), get_remote_endpoint ()); + + throw boost::system::system_error (ec); // Abort further processing + } + else + { + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out); + } } -void nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) +auto nano::transport::tcp_server::send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) -> asio::awaitable { - auto node = this->node.lock (); - if (!node) - { - return; - } + auto response = node.network.prepare_handshake_response (query, v2); + auto own_query = node.network.prepare_handshake_query (get_remote_endpoint ()); + nano::node_id_handshake handshake_response{ node.network_params.network, own_query, response }; - auto response = node->network.prepare_handshake_response (query, v2); - auto own_query = node->network.prepare_handshake_query (nano::transport::map_tcp_to_endpoint (remote_endpoint)); - nano::node_id_handshake handshake_response{ node->network_params.network, own_query, response }; - - node->logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", remote_endpoint); + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out); + node.logger.debug (nano::log::type::tcp_server, "Responding to handshake ({})", get_remote_endpoint ()); auto shared_const_buffer = handshake_response.to_shared_const_buffer (); - socket->async_write (shared_const_buffer, [this_l = shared_from_this ()] (boost::system::error_code const & ec, std::size_t size_a) { - auto node = this_l->node.lock (); - if (!node) - { - return; - } - if (ec) - { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error); - node->logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), this_l->remote_endpoint); - // Stop invalid handshake - this_l->stop (); - } - else - { - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake, nano::stat::dir::out); - node->stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out); - } - }); -} + auto [ec, size] = co_await socket->co_write (shared_const_buffer, shared_const_buffer.size ()); + debug_assert (ec || size == shared_const_buffer.size ()); + if (ec) + { + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_network_error); + node.logger.debug (nano::log::type::tcp_server, "Error sending handshake response: {} ({})", ec.message (), get_remote_endpoint ()); -/* - * handshake_message_visitor - */ - -void nano::transport::tcp_server::handshake_message_visitor::node_id_handshake (const nano::node_id_handshake & message) -{ - result = server.process_handshake (message); -} - -void nano::transport::tcp_server::handshake_message_visitor::bulk_pull (const nano::bulk_pull & message) -{ - result = handshake_status::bootstrap; -} - -void nano::transport::tcp_server::handshake_message_visitor::bulk_pull_account (const nano::bulk_pull_account & message) -{ - result = handshake_status::bootstrap; -} - -void nano::transport::tcp_server::handshake_message_visitor::bulk_push (const nano::bulk_push & message) -{ - result = handshake_status::bootstrap; -} - -void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const nano::frontier_req & message) -{ - result = handshake_status::bootstrap; + throw boost::system::system_error (ec); // Abort further processing + } + else + { + node.stats.inc (nano::stat::type::tcp_server, nano::stat::detail::handshake_response, nano::stat::dir::out); + } } /* @@ -442,7 +415,6 @@ void nano::transport::tcp_server::handshake_message_visitor::frontier_req (const void nano::transport::tcp_server::realtime_message_visitor::keepalive (const nano::keepalive & message) { process = true; - server.set_last_keepalive (message); } void nano::transport::tcp_server::realtime_message_visitor::publish (const nano::publish & message) @@ -467,21 +439,7 @@ void nano::transport::tcp_server::realtime_message_visitor::frontier_req (const void nano::transport::tcp_server::realtime_message_visitor::telemetry_req (const nano::telemetry_req & message) { - auto node = server.node.lock (); - if (!node) - { - return; - } - // Only handle telemetry requests if they are outside the cooldown period - if (server.last_telemetry_req + node->network_params.network.telemetry_request_cooldown < std::chrono::steady_clock::now ()) - { - server.last_telemetry_req = std::chrono::steady_clock::now (); - process = true; - } - else - { - node->stats.inc (nano::stat::type::telemetry, nano::stat::detail::request_within_protection_cache_zone); - } + process = true; } void nano::transport::tcp_server::realtime_message_visitor::telemetry_ack (const nano::telemetry_ack & message) @@ -503,56 +461,13 @@ void nano::transport::tcp_server::realtime_message_visitor::asc_pull_ack (const * */ -// TODO: We could periodically call this (from a dedicated timeout thread for eg.) but socket already handles timeouts, -// and since we only ever store tcp_server as weak_ptr, socket timeout will automatically trigger tcp_server cleanup -void nano::transport::tcp_server::timeout () -{ - auto node = this->node.lock (); - if (!node) - { - return; - } - if (socket->has_timed_out ()) - { - node->logger.debug (nano::log::type::tcp_server, "Closing TCP server due to timeout ({})", remote_endpoint); - - socket->close_async (); - } -} - -void nano::transport::tcp_server::set_last_keepalive (nano::keepalive const & message) -{ - std::lock_guard lock{ mutex }; - if (!last_keepalive) - { - last_keepalive = message; - } -} - -std::optional nano::transport::tcp_server::pop_last_keepalive () -{ - std::lock_guard lock{ mutex }; - auto result = last_keepalive; - last_keepalive = std::nullopt; - return result; -} - bool nano::transport::tcp_server::to_bootstrap_connection () { - auto node = this->node.lock (); - if (!node) + if (node.flags.disable_bootstrap_listener) { return false; } - if (!allow_bootstrap) - { - return false; - } - if (node->flags.disable_bootstrap_listener) - { - return false; - } - if (node->tcp_listener.bootstrap_count () >= node->config.bootstrap_connections_max) + if (node.tcp_listener.bootstrap_count () >= node.config.bootstrap_connections_max) { return false; } @@ -563,19 +478,14 @@ bool nano::transport::tcp_server::to_bootstrap_connection () socket->type_set (nano::transport::socket_type::bootstrap); - node->logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", remote_endpoint); + node.logger.debug (nano::log::type::tcp_server, "Switched to bootstrap mode ({})", get_remote_endpoint ()); return true; } bool nano::transport::tcp_server::to_realtime_connection (nano::account const & node_id) { - auto node = this->node.lock (); - if (!node) - { - return false; - } - if (node->flags.disable_tcp_realtime) + if (node.flags.disable_tcp_realtime) { return false; } @@ -584,7 +494,7 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const & return false; } - auto channel_l = node->network.tcp_channels.create (socket, shared_from_this (), node_id); + auto channel_l = node.network.tcp_channels.create (socket, shared_from_this (), node_id); if (!channel_l) { return false; @@ -593,22 +503,7 @@ bool nano::transport::tcp_server::to_realtime_connection (nano::account const & socket->type_set (nano::transport::socket_type::realtime); - node->logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", remote_endpoint); + node.logger.debug (nano::log::type::tcp_server, "Switched to realtime mode ({})", get_remote_endpoint ()); return true; } - -bool nano::transport::tcp_server::is_undefined_connection () const -{ - return socket->type () == nano::transport::socket_type::undefined; -} - -bool nano::transport::tcp_server::is_bootstrap_connection () const -{ - return socket->type () == nano::transport::socket_type::bootstrap; -} - -bool nano::transport::tcp_server::is_realtime_connection () const -{ - return socket->type () == nano::transport::socket_type::realtime; -} diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 13f615e2..f21f3b9c 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -1,63 +1,57 @@ #pragma once +#include #include +#include #include #include #include #include -namespace nano -{ -class message; -} - namespace nano::transport { -class message_deserializer; -class tcp_server; - class tcp_server final : public std::enable_shared_from_this { public: - tcp_server (std::shared_ptr, std::shared_ptr, bool allow_bootstrap = true); + tcp_server (nano::node &, std::shared_ptr); ~tcp_server (); void start (); - void stop (); - void initiate_handshake (); - void timeout (); - void set_last_keepalive (nano::keepalive const & message); - std::optional pop_last_keepalive (); + void close (); + void close_async (); // Safe to call from io context - std::shared_ptr const socket; - std::weak_ptr const node; - nano::mutex mutex; - std::atomic stopped{ false }; - std::atomic handshake_received{ false }; - // Remote endpoint used to remove response channel even after socket closing - nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; - std::chrono::steady_clock::time_point last_telemetry_req{}; + bool alive () const; + +public: + nano::endpoint get_remote_endpoint () const + { + return socket->get_remote_endpoint (); + } + nano::endpoint get_local_endpoint () const + { + return socket->get_local_endpoint (); + } + nano::transport::socket_type get_type () const + { + return socket->type (); + } private: + void stop (); + enum class process_result { abort, progress, - pause, }; - void receive_message (); - void received_message (std::unique_ptr message); - process_result process_message (std::unique_ptr message); - void queue_realtime (std::unique_ptr message); - - bool to_bootstrap_connection (); - bool to_realtime_connection (nano::account const & node_id); - bool is_undefined_connection () const; - bool is_bootstrap_connection () const; - bool is_realtime_connection () const; + asio::awaitable start_impl (); + asio::awaitable perform_handshake (); + asio::awaitable run_realtime (); + asio::awaitable receive_message (); + asio::awaitable read_socket (size_t size) const; enum class handshake_status { @@ -67,44 +61,34 @@ private: bootstrap, }; - handshake_status process_handshake (nano::node_id_handshake const & message); - void send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); + asio::awaitable process_handshake (nano::node_id_handshake const & message); + asio::awaitable send_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2); + asio::awaitable send_handshake_request (); private: - bool const allow_bootstrap; - std::shared_ptr message_deserializer; - std::optional last_keepalive; + nano::node & node; - // Every realtime connection must have an associated channel - std::shared_ptr channel; + std::shared_ptr socket; + std::shared_ptr channel; // Every realtime connection must have an associated channel + + nano::async::strand strand; + nano::async::task task; + + nano::shared_buffer buffer; + static size_t constexpr max_buffer_size = 64 * 1024; // 64 KB + + std::atomic handshake_received{ false }; + +private: + bool to_bootstrap_connection (); + bool to_realtime_connection (nano::account const & node_id); private: // Visitors - class handshake_message_visitor : public nano::message_visitor - { - public: - handshake_status result{ handshake_status::abort }; - - explicit handshake_message_visitor (tcp_server & server) : - server{ server } {}; - - void node_id_handshake (nano::node_id_handshake const &) override; - void bulk_pull (nano::bulk_pull const &) override; - void bulk_pull_account (nano::bulk_pull_account const &) override; - void bulk_push (nano::bulk_push const &) override; - void frontier_req (nano::frontier_req const &) override; - - private: - tcp_server & server; - }; - class realtime_message_visitor : public nano::message_visitor { public: bool process{ false }; - explicit realtime_message_visitor (tcp_server & server) : - server{ server } {}; - void keepalive (nano::keepalive const &) override; void publish (nano::publish const &) override; void confirm_req (nano::confirm_req const &) override; @@ -114,11 +98,6 @@ private: // Visitors void telemetry_ack (nano::telemetry_ack const &) override; void asc_pull_req (nano::asc_pull_req const &) override; void asc_pull_ack (nano::asc_pull_ack const &) override; - - private: - tcp_server & server; }; - - friend class handshake_message_visitor; }; } diff --git a/nano/node/transport/tcp_socket.cpp b/nano/node/transport/tcp_socket.cpp index 8041fb03..0b9a8c6f 100644 --- a/nano/node/transport/tcp_socket.cpp +++ b/nano/node/transport/tcp_socket.cpp @@ -64,15 +64,10 @@ void nano::transport::tcp_socket::close () void nano::transport::tcp_socket::close_async () { - if (closed) // Avoid closing the socket multiple times - { - return; - } - // Node context must be running to gracefully stop async tasks debug_assert (!node.io_ctx.stopped ()); - asio::post (strand, [this, /* lifetime guard */ this_s = shared_from_this ()] () { + asio::dispatch (strand, [this, /* lifetime guard */ this_s = shared_from_this ()] () { close_impl (); }); } diff --git a/nano/node/transport/tcp_socket.hpp b/nano/node/transport/tcp_socket.hpp index f87e6552..5d01840a 100644 --- a/nano/node/transport/tcp_socket.hpp +++ b/nano/node/transport/tcp_socket.hpp @@ -25,7 +25,7 @@ public: ~tcp_socket (); void close (); - void close_async (); + void close_async (); // Safe to call from io context nano::endpoint get_remote_endpoint () const; nano::endpoint get_local_endpoint () const; diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 6ddd6005..c42a5142 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1313,8 +1313,6 @@ TEST (telemetry, ongoing_requests) auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); - nano::test::wait_peer_connections (system); - ASSERT_EQ (0, node_client->telemetry.size ()); ASSERT_EQ (0, node_server->telemetry.size ()); ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); @@ -1348,8 +1346,6 @@ namespace transport system.add_node (node_flags); } - nano::test::wait_peer_connections (system); - std::vector threads; auto const num_threads = 4; @@ -1500,8 +1496,6 @@ TEST (telemetry, cache_read_and_timeout) auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); - nano::test::wait_peer_connections (system); - // Request telemetry metrics std::optional telemetry_data; auto channel = node_client->network.find_node_id (node_server->get_node_id ()); From 597f94670b228ccabe69e8f2d582661ca4888b0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Wed, 23 Jul 2025 11:19:48 +0200 Subject: [PATCH 4/4] Remove nano::async_write code inspection --- ci/impl/code-inspector.sh | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ci/impl/code-inspector.sh b/ci/impl/code-inspector.sh index 0e71cf17..e44018d6 100644 --- a/ci/impl/code-inspector.sh +++ b/ci/impl/code-inspector.sh @@ -10,12 +10,6 @@ code_inspect() return 1 fi - # This is to prevent out of scope access in async_write from asio which is not picked up by static analysers - if [[ $(grep -rl --exclude="*asio.hpp" "asio::async_write" $SOURCE_ROOT_PATH/nano) ]]; then - echo "Using boost::asio::async_write directly is not permitted (except in nano/lib/asio.hpp). Use nano::async_write instead" >&2 - return 1 - fi - if [[ $(grep -rlP "^\s*assert \(" $SOURCE_ROOT_PATH/nano) ]]; then echo "Using assert is not permitted. Use debug_assert instead." >&2 return 1