diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index ac7e4c6ea..e281c724f 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -18,6 +18,7 @@ add_executable (core_test processor_service.cpp peer_container.cpp signing.cpp + socket.cpp timer.cpp uint256_union.cpp versioning.cpp diff --git a/nano/core_test/logger.cpp b/nano/core_test/logger.cpp index 4d8f09d75..5af98af30 100644 --- a/nano/core_test/logger.cpp +++ b/nano/core_test/logger.cpp @@ -15,6 +15,7 @@ TEST (logging, serialization) logging1.ledger_logging_value = !logging1.ledger_logging_value; logging1.ledger_duplicate_logging_value = !logging1.ledger_duplicate_logging_value; logging1.network_logging_value = !logging1.network_logging_value; + logging1.network_timeout_logging_value = !logging1.network_timeout_logging_value; logging1.network_message_logging_value = !logging1.network_message_logging_value; logging1.network_publish_logging_value = !logging1.network_publish_logging_value; logging1.network_packet_logging_value = !logging1.network_packet_logging_value; @@ -37,6 +38,7 @@ TEST (logging, serialization) ASSERT_EQ (logging1.ledger_logging_value, logging2.ledger_logging_value); ASSERT_EQ (logging1.ledger_duplicate_logging_value, logging2.ledger_duplicate_logging_value); ASSERT_EQ (logging1.network_logging_value, logging2.network_logging_value); + ASSERT_EQ (logging1.network_timeout_logging_value, logging2.network_timeout_logging_value); ASSERT_EQ (logging1.network_message_logging_value, logging2.network_message_logging_value); ASSERT_EQ (logging1.network_publish_logging_value, logging2.network_publish_logging_value); ASSERT_EQ (logging1.network_packet_logging_value, logging2.network_packet_logging_value); @@ -81,11 +83,13 @@ TEST (logging, upgrade_v6_v7) logging1.serialize_json (tree); tree.erase ("version"); tree.erase ("min_time_between_output"); + tree.erase ("network_timeout_logging_value"); bool upgraded (false); ASSERT_FALSE (logging2.deserialize_json (upgraded, tree)); ASSERT_TRUE (upgraded); ASSERT_LE (7, tree.get ("version")); ASSERT_EQ (5, tree.get ("min_time_between_output")); + ASSERT_EQ (false, tree.get ("network_timeout_logging_value")); } namespace diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 3aba6bf20..fa4088da4 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1149,7 +1149,12 @@ TEST (network, endpoint_bad_fd) system.nodes[0]->stop (); auto endpoint (system.nodes[0]->network.endpoint ()); ASSERT_TRUE (endpoint.address ().is_loopback ()); - ASSERT_EQ (0, endpoint.port ()); + // The endpoint is invalidated asynchronously + system.deadline_set (10s); + while (system.nodes[0]->network.endpoint ().port () != 0) + { + ASSERT_NO_ERROR (system.poll ()); + } } TEST (network, reserved_address) @@ -1384,14 +1389,22 @@ TEST (bootstrap, keepalive) auto socket (std::make_shared (system.nodes[0])); nano::keepalive keepalive; auto input (keepalive.to_bytes ()); - socket->async_connect (system.nodes[0]->bootstrap.endpoint (), [&input, socket](boost::system::error_code const & ec) { + std::atomic write_done (false); + socket->async_connect (system.nodes[0]->bootstrap.endpoint (), [&input, socket, &write_done](boost::system::error_code const & ec) { ASSERT_FALSE (ec); - socket->async_write (input, [&input](boost::system::error_code const & ec, size_t size_a) { + socket->async_write (input, [&input, &write_done](boost::system::error_code const & ec, size_t size_a) { ASSERT_FALSE (ec); ASSERT_EQ (input->size (), size_a); + write_done = true; }); }); + system.deadline_set (std::chrono::seconds (5)); + while (!write_done) + { + ASSERT_NO_ERROR (system.poll ()); + } + auto output (keepalive.to_bytes ()); std::atomic done (false); socket->async_read (output, output->size (), [&output, &done](boost::system::error_code const & ec, size_t size_a) { @@ -1942,7 +1955,7 @@ TEST (bootstrap, tcp_listener_timeout_empty) { nano::system system (24000, 1); auto node0 (system.nodes[0]); - node0->config.tcp_server_timeout = std::chrono::seconds (1); + node0->config.tcp_idle_timeout = std::chrono::seconds (1); auto socket (std::make_shared (node0)); std::atomic connected (false); socket->async_connect (node0->bootstrap.endpoint (), [&connected](boost::system::error_code const & ec) { @@ -1955,7 +1968,7 @@ TEST (bootstrap, tcp_listener_timeout_empty) ASSERT_NO_ERROR (system.poll ()); } bool disconnected (false); - system.deadline_set (std::chrono::seconds (5)); + system.deadline_set (std::chrono::seconds (6)); while (!disconnected) { { @@ -1970,7 +1983,7 @@ TEST (bootstrap, tcp_listener_timeout_keepalive) { nano::system system (24000, 1); auto node0 (system.nodes[0]); - node0->config.tcp_server_timeout = std::chrono::seconds (1); + node0->config.tcp_idle_timeout = std::chrono::seconds (1); auto socket (std::make_shared (node0)); nano::keepalive keepalive; auto input (keepalive.to_bytes ()); @@ -1991,7 +2004,7 @@ TEST (bootstrap, tcp_listener_timeout_keepalive) ASSERT_EQ (node0->bootstrap.connections.size (), 1); } bool disconnected (false); - system.deadline_set (std::chrono::seconds (5)); + system.deadline_set (std::chrono::seconds (10)); while (!disconnected) { { diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 1a9a0ba6e..ea73c20fe 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -441,9 +441,10 @@ TEST (node, connect_after_junk) nano::system system (24000, 1); nano::node_init init1; auto node1 (std::make_shared (init1, system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work)); - uint64_t junk (0); + auto junk_buffer (std::make_shared> ()); + junk_buffer->push_back (0); nano::transport::channel_udp channel1 (node1->network.udp_channels, system.nodes[0]->network.endpoint ()); - channel1.send_buffer_raw (boost::asio::buffer (&junk, sizeof (junk)), [](boost::system::error_code const &, size_t) {}); + channel1.send_buffer (junk_buffer, nano::stat::detail::bulk_pull, [](boost::system::error_code const &, size_t) {}); system.deadline_set (10s); while (system.nodes[0]->stats.count (nano::stat::type::error) == 0) { @@ -697,8 +698,8 @@ TEST (node_config, v16_v17_upgrade) nano::node_config config; config.logging.init (path); // These config options should not be present - ASSERT_FALSE (tree.get_optional_child ("tcp_client_timeout")); - ASSERT_FALSE (tree.get_optional_child ("tcp_server_timeout")); + ASSERT_FALSE (tree.get_optional_child ("tcp_io_timeout")); + ASSERT_FALSE (tree.get_optional_child ("tcp_idle_timeout")); ASSERT_FALSE (tree.get_optional_child ("pow_sleep_interval")); ASSERT_FALSE (tree.get_optional_child ("external_address")); ASSERT_FALSE (tree.get_optional_child ("external_port")); @@ -706,8 +707,8 @@ TEST (node_config, v16_v17_upgrade) config.deserialize_json (upgraded, tree); // The config options should be added after the upgrade - ASSERT_TRUE (!!tree.get_optional_child ("tcp_client_timeout")); - ASSERT_TRUE (!!tree.get_optional_child ("tcp_server_timeout")); + ASSERT_TRUE (!!tree.get_optional_child ("tcp_io_timeout")); + ASSERT_TRUE (!!tree.get_optional_child ("tcp_idle_timeout")); ASSERT_TRUE (!!tree.get_optional_child ("pow_sleep_interval")); ASSERT_TRUE (!!tree.get_optional_child ("external_address")); ASSERT_TRUE (!!tree.get_optional_child ("external_port")); @@ -732,8 +733,8 @@ TEST (node_config, v17_values) // Check config is correct { - tree.put ("tcp_client_timeout", 1); - tree.put ("tcp_server_timeout", 0); + tree.put ("tcp_io_timeout", 1); + tree.put ("tcp_idle_timeout", 0); tree.put ("pow_sleep_interval", 0); tree.put ("external_address", "::1"); tree.put ("external_port", 0); @@ -749,8 +750,8 @@ TEST (node_config, v17_values) config.deserialize_json (upgraded, tree); ASSERT_FALSE (upgraded); - ASSERT_EQ (config.tcp_client_timeout.count (), 1); - ASSERT_EQ (config.tcp_server_timeout.count (), 0); + ASSERT_EQ (config.tcp_io_timeout.count (), 1); + ASSERT_EQ (config.tcp_idle_timeout.count (), 0); ASSERT_EQ (config.pow_sleep_interval.count (), 0); ASSERT_EQ (config.external_address, boost::asio::ip::address_v6::from_string ("::1")); ASSERT_EQ (config.external_port, 0); @@ -760,8 +761,8 @@ TEST (node_config, v17_values) ASSERT_TRUE (config.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time); // Check config is correct with other values - tree.put ("tcp_client_timeout", std::numeric_limits::max () - 100); - tree.put ("tcp_server_timeout", std::numeric_limits::max ()); + tree.put ("tcp_io_timeout", std::numeric_limits::max () - 100); + tree.put ("tcp_idle_timeout", std::numeric_limits::max ()); tree.put ("pow_sleep_interval", std::numeric_limits::max () - 100); tree.put ("external_address", "::ffff:192.168.1.1"); tree.put ("external_port", std::numeric_limits::max () - 1); @@ -777,8 +778,8 @@ TEST (node_config, v17_values) upgraded = false; config.deserialize_json (upgraded, tree); ASSERT_FALSE (upgraded); - ASSERT_EQ (config.tcp_client_timeout.count (), std::numeric_limits::max () - 100); - ASSERT_EQ (config.tcp_server_timeout.count (), std::numeric_limits::max ()); + ASSERT_EQ (config.tcp_io_timeout.count (), std::numeric_limits::max () - 100); + ASSERT_EQ (config.tcp_idle_timeout.count (), std::numeric_limits::max ()); ASSERT_EQ (config.pow_sleep_interval.count (), std::numeric_limits::max () - 100); ASSERT_EQ (config.external_address, boost::asio::ip::address_v6::from_string ("::ffff:192.168.1.1")); ASSERT_EQ (config.external_port, std::numeric_limits::max () - 1); diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp new file mode 100644 index 000000000..19907f56b --- /dev/null +++ b/nano/core_test/socket.cpp @@ -0,0 +1,109 @@ +#include +#include +#include +#include +#include + +using namespace std::chrono_literals; + +TEST (socket, concurrent_writes) +{ + nano::inactive_node inactivenode; + auto node = inactivenode.node; + + // This gives more realistic execution than using system#poll, allowing writes to + // queue up and drain concurrently. + nano::thread_runner runner (node->io_ctx, 1); + + constexpr size_t max_connections = 4; + constexpr size_t client_count = max_connections; + constexpr size_t message_count = 4; + constexpr size_t total_message_count = client_count * message_count; + + // We're expecting client_count*4 messages + nano::util::counted_completion read_count_completion (total_message_count); + std::function)> reader = [&read_count_completion, &total_message_count, &reader](std::shared_ptr socket_a) { + auto buff (std::make_shared> ()); + buff->resize (1); + socket_a->async_read (buff, 1, [&read_count_completion, &reader, &total_message_count, socket_a, buff](boost::system::error_code const & ec, size_t size_a) { + if (!ec) + { + if (read_count_completion.increment () < total_message_count) + { + reader (socket_a); + } + } + else if (ec != boost::asio::error::eof) + { + std::cerr << "async_read: " << ec.message () << std::endl; + } + }); + }; + + boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), 25000); + + auto server_socket (std::make_shared (node, endpoint, max_connections, nano::socket::concurrency::multi_writer)); + boost::system::error_code ec; + server_socket->start (ec); + ASSERT_FALSE (ec); + std::vector> connections; + + // On every new connection, start reading data + server_socket->on_connection ([&connections, &reader](std::shared_ptr new_connection, boost::system::error_code const & ec_a) { + if (ec_a) + { + std::cerr << "on_connection: " << ec_a.message () << std::endl; + } + else + { + connections.push_back (new_connection); + reader (new_connection); + } + // Keep accepting connections + return true; + }); + + nano::util::counted_completion connection_count_completion (client_count); + std::vector> clients; + for (unsigned i = 0; i < client_count; i++) + { + auto client (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); + clients.push_back (client); + client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000), + [&connection_count_completion](boost::system::error_code const & ec_a) { + if (ec_a) + { + std::cerr << "async_connect: " << ec_a.message () << std::endl; + } + else + { + connection_count_completion.increment (); + } + }); + } + ASSERT_FALSE (connection_count_completion.await_count_for (10s)); + + // Execute overlapping writes from multiple threads + auto client (clients[0]); + for (int i = 0; i < client_count; i++) + { + std::thread runner ([&client]() { + for (int i = 0; i < message_count; i++) + { + auto buff (std::make_shared> ()); + buff->push_back ('A' + i); + client->async_write (buff); + } + }); + runner.detach (); + } + + ASSERT_FALSE (read_count_completion.await_count_for (10s)); + node->stop (); + runner.stop_event_processing (); + runner.join (); + + ASSERT_EQ (node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in), client_count); + // We may exhaust max connections and have some tcp accept failures, but no more than the client count + ASSERT_LT (node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in), client_count); +} diff --git a/nano/core_test/testutil.hpp b/nano/core_test/testutil.hpp index d1babc489..2fcf5ca37 100644 --- a/nano/core_test/testutil.hpp +++ b/nano/core_test/testutil.hpp @@ -1,6 +1,10 @@ #pragma once +#include #include +#include +#include +#include #include #define GTEST_TEST_ERROR_CODE(expression, text, actual, expected, fail) \ @@ -36,4 +40,83 @@ extern nano::uint256_union const & nano_test_account; extern nano::uint256_union const & genesis_account; extern nano::uint256_union const & burn_account; extern nano::uint128_t const & genesis_amount; + +namespace util +{ + /** + * Helper to signal completion of async handlers in tests. + * Subclasses implement specific conditions for completion. + */ + class completion_signal + { + public: + virtual ~completion_signal () + { + notify (); + } + + /** Explicitly notify the completion */ + void notify () + { + cv.notify_all (); + } + + protected: + std::condition_variable cv; + std::mutex mutex; + }; + + /** + * Signals completion when a count is reached. + */ + class counted_completion : public completion_signal + { + public: + /** + * Constructor + * @param required_count_a When increment() reaches this count within the deadline, await_count_for() will return false. + */ + counted_completion (unsigned required_count_a) : + required_count (required_count_a) + { + } + + /** + * Wait for increment() to signal completion, or reaching the deadline. + * @param deadline_duration_a Deadline as a std::chrono duration + * @return true if the count is reached within the deadline + */ + template + bool await_count_for (UNIT deadline_duration_a) + { + nano::timer timer (nano::timer_state::started); + bool error = true; + while (error && timer.before_deadline (deadline_duration_a)) + { + error = count < required_count; + if (error) + { + std::unique_lock lock (mutex); + cv.wait_for (lock, std::chrono::milliseconds (1)); + } + } + return error; + } + + /** Increments the current count. If the required count is reached, await_count_for() waiters are notified. */ + unsigned increment () + { + auto val (count.fetch_add (1)); + if (val >= required_count) + { + notify (); + } + return val; + } + + private: + std::atomic count{ 0 }; + unsigned required_count; + }; +} } diff --git a/nano/lib/utility.cpp b/nano/lib/utility.cpp index b4fd58238..36954ce2d 100644 --- a/nano/lib/utility.cpp +++ b/nano/lib/utility.cpp @@ -142,7 +142,8 @@ void nano::thread_attributes::set (boost::thread::attributes & attrs) attrs_l->set_stack_size (8000000); //8MB } -nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a) +nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a) : +io_guard (boost::asio::make_work_guard (io_ctx_a)) { boost::thread::attributes attrs; nano::thread_attributes::set (attrs); @@ -154,6 +155,13 @@ nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned { io_ctx_a.run (); } + catch (std::exception const & ex) + { + std::cerr << ex.what () << std::endl; +#ifndef NDEBUG + throw ex; +#endif + } catch (...) { #ifndef NDEBUG @@ -176,6 +184,7 @@ nano::thread_runner::~thread_runner () void nano::thread_runner::join () { + io_guard.reset (); for (auto & i : threads) { if (i.joinable ()) @@ -185,6 +194,11 @@ void nano::thread_runner::join () } } +void nano::thread_runner::stop_event_processing () +{ + io_guard.get_executor ().context ().stop (); +} + /* * Backing code for "release_assert", which is itself a macro */ diff --git a/nano/lib/utility.hpp b/nano/lib/utility.hpp index 504806ca4..d26042c5c 100644 --- a/nano/lib/utility.hpp +++ b/nano/lib/utility.hpp @@ -123,8 +123,12 @@ class thread_runner final public: thread_runner (boost::asio::io_context &, unsigned); ~thread_runner (); + /** Tells the IO context to stop processing events.*/ + void stop_event_processing (); + /** Wait for IO threads to complete */ void join (); std::vector threads; + boost::asio::executor_work_guard io_guard; }; template diff --git a/nano/nano_wallet/entry.cpp b/nano/nano_wallet/entry.cpp index 39b4a282a..042cb24ab 100644 --- a/nano/nano_wallet/entry.cpp +++ b/nano/nano_wallet/entry.cpp @@ -244,9 +244,12 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost nano::set_secure_perm_file (config_path, error_chmod); if (!error) { - boost::asio::io_context io_ctx; config.node.logging.init (data_path); nano::logger_mt logger{ config.node.logging.min_time_between_log_output }; + + boost::asio::io_context io_ctx; + nano::thread_runner runner (io_ctx, config.node.io_threads); + std::shared_ptr node; std::shared_ptr gui; nano::set_application_icon (application); @@ -258,6 +261,7 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost nano::alarm alarm (io_ctx); nano::node_init init; nano::node_flags flags; + node = std::make_shared (init, io_ctx, data_path, alarm, config.node, work, flags); if (!init.error ()) { @@ -330,8 +334,6 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost #endif } } - - nano::thread_runner runner (io_ctx, node->config.io_threads); QObject::connect (&application, &QApplication::aboutToQuit, [&]() { ipc.stop (); node->stop (); @@ -345,6 +347,7 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost rpc_process->terminate (); } #endif + runner.stop_event_processing (); }); application.postEvent (&processor, new nano_qt::eventloop_event ([&]() { gui = std::make_shared (application, processor, *node, wallet, config.account); diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 69d1bd0e6..986a70da3 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -71,6 +71,8 @@ add_library (node transport/udp.cpp signatures.hpp signatures.cpp + socket.hpp + socket.cpp stats.hpp stats.cpp voting.hpp diff --git a/nano/node/bootstrap.cpp b/nano/node/bootstrap.cpp index 59bcddfd6..4236f63ef 100644 --- a/nano/node/bootstrap.cpp +++ b/nano/node/bootstrap.cpp @@ -20,123 +20,6 @@ constexpr unsigned bulk_push_cost_limit = 200; size_t constexpr nano::frontier_req_client::size_frontier; -nano::socket::socket (std::shared_ptr node_a) : -socket_m (node_a->io_ctx), -last_action_time (0), -async_start_time (std::numeric_limits::max ()), -node (node_a) -{ -} - -void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function callback_a) -{ - checkup (node->config.tcp_client_timeout.count ()); - auto this_l (shared_from_this ()); - start (); - socket_m.async_connect (endpoint_a, [this_l, callback_a](boost::system::error_code const & ec) { - this_l->stop (); - callback_a (ec); - }); -} - -void nano::socket::async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) -{ - assert (size_a <= buffer_a->size ()); - auto this_l (shared_from_this ()); - if (socket_m.is_open ()) - { - start (); - boost::asio::async_read (socket_m, boost::asio::buffer (buffer_a->data (), size_a), [this_l, callback_a](boost::system::error_code const & ec, size_t size_a) { - this_l->node->stats.add (nano::stat::type::traffic_bootstrap, nano::stat::dir::in, size_a); - this_l->stop (); - callback_a (ec, size_a); - }); - } -} - -void nano::socket::async_write (std::shared_ptr> buffer_a, std::function callback_a) -{ - auto this_l (shared_from_this ()); - if (socket_m.is_open ()) - { - start (); - async_write (boost::asio::buffer (buffer_a->data (), buffer_a->size ()), [this_l, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) { - this_l->node->stats.add (nano::stat::type::traffic_bootstrap, nano::stat::dir::out, size_a); - this_l->stop (); - callback_a (ec, size_a); - }); - } -} - -void nano::socket::async_write (boost::asio::const_buffer buffer_a, std::function callback_a) -{ - boost::asio::async_write (socket_m, buffer_a, callback_a); -} - -void nano::socket::start () -{ - auto now (std::chrono::steady_clock::now ().time_since_epoch ().count ()); - async_start_time = now; - last_action_time = now; -} - -void nano::socket::stop () -{ - async_start_time = std::numeric_limits::max (); - last_action_time = std::chrono::steady_clock::now ().time_since_epoch ().count (); -} - -void nano::socket::close () -{ - boost::system::error_code ec; - socket_m.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); - /* Ignore error code for shutdown as it is a best effort anyway. */ - - socket_m.close (ec); - if (ec) - { - // The underlying file descriptor is closed anyway, so just log the error and increment socket failure stat. - node->logger.try_log ("Failed to close socket gracefully: ", ec.message ()); - node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close); - } -} - -void nano::socket::checkup (uint64_t timeout_a) -{ - std::weak_ptr this_w (shared_from_this ()); - node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node->network_params.network.is_test_network () ? 1 : 10), [this_w, timeout_a]() { - if (auto this_l = this_w.lock ()) - { - if (this_l->async_start_time != std::numeric_limits::max () && this_l->async_start_time + timeout_a < static_cast (std::chrono::steady_clock::now ().time_since_epoch ().count ())) - { - if (this_l->node->config.logging.bulk_pull_logging ()) - { - this_l->node->logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % this_l->remote_endpoint ())); - } - this_l->close (); - } - else - { - this_l->checkup (timeout_a); - } - } - }); -} - -nano::tcp_endpoint nano::socket::remote_endpoint () -{ - nano::tcp_endpoint endpoint; - - if (socket_m.is_open ()) - { - boost::system::error_code remote_endpoint_error; - - endpoint = socket_m.remote_endpoint (remote_endpoint_error); - } - - return endpoint; -} - nano::bootstrap_client::bootstrap_client (std::shared_ptr node_a, std::shared_ptr attempt_a, std::shared_ptr channel_a) : node (node_a), attempt (attempt_a), @@ -1241,7 +1124,8 @@ void nano::bootstrap_attempt::connect_client (nano::tcp_endpoint const & endpoin ++connections; auto socket (std::make_shared (node)); auto this_l (shared_from_this ()); - socket->async_connect (endpoint_a, [this_l, socket, endpoint_a](boost::system::error_code const & ec) { + socket->async_connect (endpoint_a, + [this_l, socket, endpoint_a](boost::system::error_code const & ec) { if (!ec) { if (this_l->node->config.logging.bulk_pull_logging ()) @@ -1924,30 +1808,35 @@ std::unique_ptr collect_seq_con_info (bootstrap_initiato } } -nano::bootstrap_listener::bootstrap_listener (boost::asio::io_context & io_ctx_a, uint16_t port_a, nano::node & node_a) : -acceptor (io_ctx_a), -local (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port_a)), -io_ctx (io_ctx_a), +nano::bootstrap_listener::bootstrap_listener (uint16_t port_a, nano::node & node_a) : node (node_a), -defer_acceptor (io_ctx_a) +port (port_a) { } void nano::bootstrap_listener::start () { - acceptor.open (local.protocol ()); - acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); - + listening_socket = std::make_shared (node.shared (), boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.bootstrap_connections_max); boost::system::error_code ec; - acceptor.bind (local, ec); + listening_socket->start (ec); if (ec) { - node.logger.try_log (boost::str (boost::format ("Error while binding for bootstrap on port %1%: %2%") % local.port () % ec.message ())); + node.logger.try_log (boost::str (boost::format ("Error while binding for bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ())); throw std::runtime_error (ec.message ()); } - - acceptor.listen (); - accept_connection (); + listening_socket->on_connection ([this](std::shared_ptr new_connection, boost::system::error_code const & ec_a) { + bool keep_accepting = true; + if (ec_a) + { + keep_accepting = false; + this->node.logger.try_log (boost::str (boost::format ("Error while accepting bootstrap connections: %1%") % ec_a.message ())); + } + else + { + accept_action (ec_a, new_connection); + } + return keep_accepting; + }); } void nano::bootstrap_listener::stop () @@ -1958,88 +1847,41 @@ void nano::bootstrap_listener::stop () on = false; connections_l.swap (connections); } - acceptor.close (); - for (auto & i : connections_l) + if (listening_socket) { - auto connection (i.second.lock ()); - if (connection) - { - connection->socket->close (); - } + listening_socket->close (); + listening_socket = nullptr; } } -void nano::bootstrap_listener::accept_connection () +size_t nano::bootstrap_listener::connection_count () { - if (acceptor.is_open ()) - { - if (connections.size () < node.config.bootstrap_connections_max) - { - auto socket (std::make_shared (node.shared ())); - socket->checkup (node.config.tcp_server_timeout.count ()); - acceptor.async_accept (socket->socket_m, [this, socket](boost::system::error_code const & ec) { - accept_action (ec, socket); - }); - } - else - { - node.logger.try_log (boost::str (boost::format ("Unable to accept new TCP network sockets (have %1% concurrent connections, limit of %2%), will try to accept again in 1s") % connections.size () % node.config.bootstrap_connections_max)); - defer_acceptor.expires_after (std::chrono::seconds (1)); - defer_acceptor.async_wait ([this](const boost::system::error_code & ec) { - /* - * There should be no other call points that can invoke - * accept_connect() after starting the listener, so if we - * get an error from the I/O context, something is probably - * wrong. - */ - if (!ec) - { - accept_connection (); - } - }); - } - } + std::lock_guard lock (mutex); + return connections.size (); } void nano::bootstrap_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr socket_a) { - if (!ec) + auto connection (std::make_shared (socket_a, node.shared ())); { - auto connection (std::make_shared (socket_a, node.shared ())); - { - std::lock_guard lock (mutex); - if (acceptor.is_open ()) - { - connections[connection.get ()] = connection; - connection->receive (); - } - } - accept_connection (); - } - else - { - node.logger.try_log (boost::str (boost::format ("Error while accepting bootstrap connections: %1%") % ec.message ())); + std::lock_guard lock (mutex); + connections[connection.get ()] = connection; + connection->receive (); } } boost::asio::ip::tcp::endpoint nano::bootstrap_listener::endpoint () { - return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), local.port ()); + return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listening_socket->listening_port ()); } namespace nano { std::unique_ptr collect_seq_con_info (bootstrap_listener & bootstrap_listener, const std::string & name) { - size_t count = 0; - { - std::lock_guard guard (bootstrap_listener.mutex); - count = bootstrap_listener.connections.size (); - } - auto sizeof_element = sizeof (decltype (bootstrap_listener.connections)::value_type); auto composite = std::make_unique (name); - composite->add_component (std::make_unique (seq_con_info{ "connections", count, sizeof_element })); + composite->add_component (std::make_unique (seq_con_info{ "connections", bootstrap_listener.connection_count (), sizeof_element })); return composite; } } @@ -2253,7 +2095,7 @@ void nano::bootstrap_server::finish_request () else { std::weak_ptr this_w (shared_from_this ()); - node->alarm.add (std::chrono::steady_clock::now () + node->config.tcp_server_timeout + std::chrono::seconds (1), [this_w]() { + node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() { if (auto this_l = this_w.lock ()) { this_l->timeout (); @@ -2266,7 +2108,7 @@ void nano::bootstrap_server::timeout () { if (socket != nullptr) { - if (socket->last_action_time + node->config.tcp_server_timeout.count () < static_cast (std::chrono::steady_clock::now ().time_since_epoch ().count ())) + if (socket->has_timed_out ()) { if (node->config.logging.bulk_pull_logging ()) { @@ -2658,46 +2500,30 @@ void nano::bulk_pull_account_server::send_frontier () * so handle the invalid_request case by terminating the * request without any response */ - if (invalid_request) + if (!invalid_request) { - connection->finish_request (); + auto stream_transaction (connection->node->store.tx_begin_read ()); - return; + // Get account balance and frontier block hash + auto account_frontier_hash (connection->node->ledger.latest (stream_transaction, request->account)); + auto account_frontier_balance_int (connection->node->ledger.account_balance (stream_transaction, request->account)); + nano::uint128_union account_frontier_balance (account_frontier_balance_int); + + // Write the frontier block hash and balance into a buffer + send_buffer->clear (); + { + nano::vectorstream output_stream (*send_buffer); + + write (output_stream, account_frontier_hash.bytes); + write (output_stream, account_frontier_balance.bytes); + } + + // Send the buffer to the requestor + auto this_l (shared_from_this ()); + connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { + this_l->sent_action (ec, size_a); + }); } - - /* - * Supply the account frontier - */ - /** - ** Establish a database transaction - **/ - auto stream_transaction (connection->node->store.tx_begin_read ()); - - /** - ** Get account balance and frontier block hash - **/ - auto account_frontier_hash (connection->node->ledger.latest (stream_transaction, request->account)); - auto account_frontier_balance_int (connection->node->ledger.account_balance (stream_transaction, request->account)); - nano::uint128_union account_frontier_balance (account_frontier_balance_int); - - /** - ** Write the frontier block hash and balance into a buffer - **/ - send_buffer->clear (); - { - nano::vectorstream output_stream (*send_buffer); - - write (output_stream, account_frontier_hash.bytes); - write (output_stream, account_frontier_balance.bytes); - } - - /** - ** Send the buffer to the requestor - **/ - auto this_l (shared_from_this ()); - connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) { - this_l->sent_action (ec, size_a); - }); } void nano::bulk_pull_account_server::send_next_block () diff --git a/nano/node/bootstrap.hpp b/nano/node/bootstrap.hpp index 441fa2fd9..3bf809f64 100644 --- a/nano/node/bootstrap.hpp +++ b/nano/node/bootstrap.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -33,26 +34,6 @@ enum class sync_result error, fork }; -class socket final : public std::enable_shared_from_this -{ -public: - explicit socket (std::shared_ptr); - void async_connect (nano::tcp_endpoint const &, std::function); - void async_read (std::shared_ptr>, size_t, std::function); - void async_write (std::shared_ptr>, std::function); - void async_write (boost::asio::const_buffer, std::function); - void start (); - void stop (); - void close (); - void checkup (uint64_t); - nano::tcp_endpoint remote_endpoint (); - boost::asio::ip::tcp::socket socket_m; - std::atomic last_action_time; - -private: - std::atomic async_start_time; - std::shared_ptr node; -}; class bootstrap_client; class pull_info @@ -285,22 +266,21 @@ class bootstrap_server; class bootstrap_listener final { public: - bootstrap_listener (boost::asio::io_context &, uint16_t, nano::node &); + bootstrap_listener (uint16_t, nano::node &); void start (); void stop (); - void accept_connection (); void accept_action (boost::system::error_code const &, std::shared_ptr); + size_t connection_count (); + std::mutex mutex; std::unordered_map> connections; nano::tcp_endpoint endpoint (); - boost::asio::ip::tcp::acceptor acceptor; - nano::tcp_endpoint local; - boost::asio::io_context & io_ctx; nano::node & node; + std::shared_ptr listening_socket; bool on; private: - boost::asio::steady_timer defer_acceptor; + uint16_t port; }; std::unique_ptr collect_seq_con_info (bootstrap_listener & bootstrap_listener, const std::string & name); diff --git a/nano/node/ipc.cpp b/nano/node/ipc.cpp index 58f8cdfa5..22bd3f445 100644 --- a/nano/node/ipc.cpp +++ b/nano/node/ipc.cpp @@ -256,7 +256,7 @@ public: server.node.logger.always_log ("IPC: acceptor error: ", ec.message ()); } - if (acceptor->is_open () && ec != boost::asio::error::operation_aborted) + if (ec != boost::asio::error::operation_aborted && acceptor->is_open ()) { this->accept (); } diff --git a/nano/node/logging.cpp b/nano/node/logging.cpp index 880bc404d..8cf0f33a0 100644 --- a/nano/node/logging.cpp +++ b/nano/node/logging.cpp @@ -38,6 +38,7 @@ nano::error nano::logging::serialize_json (nano::jsonconfig & json) const json.put ("ledger_duplicate", ledger_duplicate_logging_value); json.put ("vote", vote_logging_value); json.put ("network", network_logging_value); + json.put ("network_timeout", network_timeout_logging_value); json.put ("network_message", network_message_logging_value); json.put ("network_publish", network_publish_logging_value); json.put ("network_packet", network_packet_logging_value); @@ -87,6 +88,7 @@ bool nano::logging::upgrade_json (unsigned version_a, nano::jsonconfig & json) upgraded_l = true; case 6: json.put ("min_time_between_output", min_time_between_log_output.count ()); + json.put ("network_timeout", network_timeout_logging_value); json.erase ("log_rpc"); json.put ("long_database_txns", false); upgraded_l = true; @@ -126,6 +128,7 @@ nano::error nano::logging::deserialize_json (bool & upgraded_a, nano::jsonconfig json.get ("ledger_duplicate", ledger_duplicate_logging_value); json.get ("vote", vote_logging_value); json.get ("network", network_logging_value); + json.get ("network_timeout", network_timeout_logging_value); json.get ("network_message", network_message_logging_value); json.get ("network_publish", network_publish_logging_value); json.get ("network_packet", network_packet_logging_value); @@ -168,6 +171,11 @@ bool nano::logging::network_logging () const return network_logging_value; } +bool nano::logging::network_timeout_logging () const +{ + return network_logging () && network_timeout_logging_value; +} + bool nano::logging::network_message_logging () const { return network_logging () && network_message_logging_value; diff --git a/nano/node/logging.hpp b/nano/node/logging.hpp index dcc09f8d3..397430aee 100644 --- a/nano/node/logging.hpp +++ b/nano/node/logging.hpp @@ -25,6 +25,7 @@ public: bool ledger_duplicate_logging () const; bool vote_logging () const; bool network_logging () const; + bool network_timeout_logging () const; bool network_message_logging () const; bool network_publish_logging () const; bool network_packet_logging () const; @@ -45,6 +46,7 @@ public: bool ledger_duplicate_logging_value{ false }; bool vote_logging_value{ false }; bool network_logging_value{ true }; + bool network_timeout_logging_value{ false }; bool network_message_logging_value{ false }; bool network_publish_logging_value{ false }; bool network_packet_logging_value{ false }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index edb464184..a86e1805f 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1035,7 +1035,7 @@ ledger (store, stats, config.epoch_block_link, config.epoch_block_signer), checker (config.signature_checker_threads), network (*this, config.peering_port), bootstrap_initiator (*this), -bootstrap (io_ctx_a, config.peering_port, *this), +bootstrap (config.peering_port, *this), application_path (application_path_a), port_mapping (*this), vote_processor (*this), diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index ac5e56e89..eafd4e765 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -114,8 +114,8 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const json.put ("allow_local_peers", allow_local_peers); json.put ("vote_minimum", vote_minimum.to_string_dec ()); json.put ("unchecked_cutoff_time", unchecked_cutoff_time.count ()); - json.put ("tcp_client_timeout", tcp_client_timeout.count ()); - json.put ("tcp_server_timeout", tcp_server_timeout.count ()); + json.put ("tcp_io_timeout", tcp_io_timeout.count ()); + json.put ("tcp_idle_timeout", tcp_idle_timeout.count ()); json.put ("pow_sleep_interval", pow_sleep_interval.count ()); json.put ("external_address", external_address.to_string ()); json.put ("external_port", external_port); @@ -255,8 +255,8 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso nano::jsonconfig diagnostics_l; diagnostics_config.serialize_json (diagnostics_l); json.put_child ("diagnostics", diagnostics_l); - json.put ("tcp_client_timeout", tcp_client_timeout.count ()); - json.put ("tcp_server_timeout", tcp_server_timeout.count ()); + json.put ("tcp_io_timeout", tcp_io_timeout.count ()); + json.put ("tcp_idle_timeout", tcp_idle_timeout.count ()); json.put (pow_sleep_interval_key, pow_sleep_interval.count ()); json.put ("external_address", external_address.to_string ()); json.put ("external_port", external_port); @@ -361,12 +361,13 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco auto unchecked_cutoff_time_l = static_cast (unchecked_cutoff_time.count ()); json.get ("unchecked_cutoff_time", unchecked_cutoff_time_l); unchecked_cutoff_time = std::chrono::seconds (unchecked_cutoff_time_l); - auto tcp_client_timeout_l = static_cast (tcp_client_timeout.count ()); - json.get ("tcp_client_timeout", tcp_client_timeout_l); - tcp_client_timeout = std::chrono::seconds (tcp_client_timeout_l); - auto tcp_server_timeout_l = static_cast (tcp_server_timeout.count ()); - json.get ("tcp_server_timeout", tcp_server_timeout_l); - tcp_server_timeout = std::chrono::seconds (tcp_server_timeout_l); + + auto tcp_io_timeout_l = static_cast (tcp_io_timeout.count ()); + json.get ("tcp_io_timeout", tcp_io_timeout_l); + tcp_io_timeout = std::chrono::seconds (tcp_io_timeout_l); + auto tcp_idle_timeout_l = static_cast (tcp_idle_timeout.count ()); + json.get ("tcp_idle_timeout", tcp_idle_timeout_l); + tcp_idle_timeout = std::chrono::seconds (tcp_idle_timeout_l); auto ipc_config_l (json.get_optional_child ("ipc")); if (ipc_config_l) diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index a9f1bbc65..99878d985 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -61,8 +61,10 @@ public: uint16_t external_port{ 0 }; std::chrono::milliseconds block_processor_batch_max_time{ std::chrono::milliseconds (5000) }; std::chrono::seconds unchecked_cutoff_time{ std::chrono::seconds (4 * 60 * 60) }; // 4 hours - std::chrono::seconds tcp_client_timeout{ std::chrono::seconds (5) }; - std::chrono::seconds tcp_server_timeout{ std::chrono::seconds (30) }; + /** Timeout for initiated async operations */ + std::chrono::seconds tcp_io_timeout{ network_params.network.is_test_network () ? std::chrono::seconds (5) : std::chrono::seconds (15) }; + /** Default maximum idle time for a socket before it's automatically closed */ + std::chrono::seconds tcp_idle_timeout{ std::chrono::minutes (2) }; std::chrono::nanoseconds pow_sleep_interval{ 0 }; static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60); static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5; diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp new file mode 100644 index 000000000..f6acbbef9 --- /dev/null +++ b/nano/node/socket.cpp @@ -0,0 +1,347 @@ +#include +#include +#include + +nano::socket::socket (std::shared_ptr node_a, boost::optional max_idle_time_a, nano::socket::concurrency concurrency_a) : +strand (node_a->io_ctx.get_executor ()), +tcp_socket (node_a->io_ctx), +node (node_a), +writer_concurrency (concurrency_a), +next_deadline (std::numeric_limits::max ()), +last_completion_time (0), +max_idle_time (max_idle_time_a) +{ + if (!max_idle_time) + { + max_idle_time = node_a->config.tcp_idle_timeout; + } +} + +nano::socket::~socket () +{ + close_internal (); +} + +void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function callback_a) +{ + checkup (); + auto this_l (shared_from_this ()); + start_timer (); + this_l->tcp_socket.async_connect (endpoint_a, + boost::asio::bind_executor (this_l->strand, + [this_l, callback_a, endpoint_a](boost::system::error_code const & ec) { + this_l->stop_timer (); + this_l->remote = endpoint_a; + callback_a (ec); + })); +} + +void nano::socket::async_read (std::shared_ptr> buffer_a, size_t size_a, std::function callback_a) +{ + assert (size_a <= buffer_a->size ()); + auto this_l (shared_from_this ()); + if (!closed) + { + start_timer (); + boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, size_a, this_l]() { + assert (!this_l->closed); + boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a), + boost::asio::bind_executor (this_l->strand, + [this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) { + if (auto node = this_l->node.lock ()) + { + node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a); + this_l->stop_timer (); + callback_a (ec, size_a); + } + })); + })); + } +} + +void nano::socket::async_write (std::shared_ptr> buffer_a, std::function callback_a) +{ + auto this_l (shared_from_this ()); + if (!closed) + { + if (writer_concurrency == nano::socket::concurrency::multi_writer) + { + boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l]() { + bool write_in_progress = !this_l->send_queue.empty (); + this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a }); + if (!write_in_progress) + { + this_l->write_queued_messages (); + } + })); + } + else + { + start_timer (); + boost::asio::async_write (tcp_socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()), + boost::asio::bind_executor (strand, + [this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) { + if (auto node = this_l->node.lock ()) + { + node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); + this_l->stop_timer (); + if (callback_a) + { + callback_a (ec, size_a); + } + } + })); + } + } +} + +void nano::socket::write_queued_messages () +{ + if (!closed) + { + std::weak_ptr this_w (shared_from_this ()); + auto msg (send_queue.front ()); + start_timer (); + boost::asio::async_write (tcp_socket, boost::asio::buffer (msg.buffer->data (), msg.buffer->size ()), + boost::asio::bind_executor (strand, + [msg, this_w](boost::system::error_code ec, std::size_t size_a) { + if (auto this_l = this_w.lock ()) + { + if (auto node = this_l->node.lock ()) + { + node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a); + + this_l->stop_timer (); + + if (!this_l->closed) + { + if (msg.callback) + { + msg.callback (ec, size_a); + } + + this_l->send_queue.pop_front (); + if (!ec && !this_l->send_queue.empty ()) + { + this_l->write_queued_messages (); + } + } + } + } + })); + } +} + +void nano::socket::start_timer () +{ + if (auto node_l = node.lock ()) + { + start_timer (node_l->config.tcp_io_timeout); + } +} + +void nano::socket::start_timer (std::chrono::seconds deadline_a) +{ + next_deadline = deadline_a.count (); +} + +void nano::socket::stop_timer () +{ + last_completion_time = nano::seconds_since_epoch (); +} + +void nano::socket::checkup () +{ + std::weak_ptr this_w (shared_from_this ()); + if (auto node_l = node.lock ()) + { + node_l->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node_l->network_params.network.is_test_network () ? 1 : 2), [this_w, node_l]() { + if (auto this_l = this_w.lock ()) + { + uint64_t now (nano::seconds_since_epoch ()); + if (this_l->next_deadline != std::numeric_limits::max () && now - this_l->last_completion_time > this_l->next_deadline) + { + if (auto node_l = this_l->node.lock ()) + { + this_l->timed_out = true; + this_l->close (); + if (node_l->config.logging.network_timeout_logging ()) + { + node_l->logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % this_l->remote_endpoint ())); + } + } + } + else if (!this_l->closed) + { + this_l->checkup (); + } + } + }); + } +} + +bool nano::socket::has_timed_out () const +{ + return timed_out; +} + +void nano::socket::set_max_idle_timeout (std::chrono::seconds max_idle_time_a) +{ + auto this_l (shared_from_this ()); + boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, max_idle_time_a]() { + this_l->max_idle_time = max_idle_time_a; + })); +} + +void nano::socket::close () +{ + auto this_l (shared_from_this ()); + boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l] { + this_l->close_internal (); + })); +} + +// This must be called from a strand or the destructor +void nano::socket::close_internal () +{ + if (!closed) + { + closed = true; + max_idle_time = boost::none; + boost::system::error_code ec; + + // Ignore error code for shutdown as it is best-effort + tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec); + tcp_socket.close (ec); + send_queue.clear (); + if (ec) + { + if (auto node_l = node.lock ()) + { + node_l->logger.try_log ("Failed to close socket gracefully: ", ec.message ()); + node_l->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close); + } + } + } +} + +nano::tcp_endpoint nano::socket::remote_endpoint () const +{ + return remote; +} + +void nano::socket::set_writer_concurrency (concurrency writer_concurrency_a) +{ + writer_concurrency = writer_concurrency_a; +} + +nano::server_socket::server_socket (std::shared_ptr node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, nano::socket::concurrency concurrency_a) : +socket (node_a, std::chrono::seconds::max (), concurrency_a), acceptor (node_a->io_ctx), local (local_a), deferred_accept_timer (node_a->io_ctx), max_inbound_connections (max_connections_a), concurrency_new_connections (concurrency_a) +{ +} + +void nano::server_socket::start (boost::system::error_code & ec_a) +{ + acceptor.open (local.protocol ()); + acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); + acceptor.bind (local, ec_a); + if (!ec_a) + { + acceptor.listen (boost::asio::socket_base::max_listen_connections, ec_a); + } +} + +void nano::server_socket::close () +{ + auto this_l (std::static_pointer_cast (shared_from_this ())); + + boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l]() { + this_l->close_internal (); + this_l->acceptor.close (); + for (auto & connection_w : this_l->connections) + { + if (auto connection_l = connection_w.lock ()) + { + connection_l->close (); + } + } + this_l->connections.clear (); + })); +} + +void nano::server_socket::on_connection (std::function, boost::system::error_code const &)> callback_a) +{ + auto this_l (std::static_pointer_cast (shared_from_this ())); + + boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, callback_a]() { + if (auto node_l = this_l->node.lock ()) + { + if (this_l->acceptor.is_open ()) + { + if (this_l->connections.size () < this_l->max_inbound_connections) + { + // Prepare new connection + auto new_connection (std::make_shared (node_l->shared (), boost::none, this_l->concurrency_new_connections)); + this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote, + boost::asio::bind_executor (this_l->strand, + [this_l, new_connection, callback_a](boost::system::error_code const & ec_a) { + if (auto node_l = this_l->node.lock ()) + { + if (!ec_a) + { + // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start + // an IO operation immediately, which will start a timer. + new_connection->checkup (); + new_connection->start_timer (node_l->network_params.network.is_test_network () ? std::chrono::seconds (2) : node_l->config.tcp_idle_timeout); + node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); + this_l->connections.push_back (new_connection); + this_l->evict_dead_connections (); + } + else + { + node_l->logger.try_log ("Unable to accept connection: ", ec_a.message ()); + } + + // If the callback returns true, keep accepting new connections + if (callback_a (new_connection, ec_a)) + { + this_l->on_connection (callback_a); + } + else + { + node_l->logger.try_log ("Stopping to accept connections"); + } + } + })); + } + else + { + this_l->evict_dead_connections (); + node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); + this_l->deferred_accept_timer.expires_after (std::chrono::seconds (2)); + this_l->deferred_accept_timer.async_wait ([this_l, callback_a](const boost::system::error_code & ec_a) { + if (!ec_a) + { + // Try accepting again + std::static_pointer_cast (this_l)->on_connection (callback_a); + } + else + { + if (auto node_l = this_l->node.lock ()) + { + node_l->logger.try_log ("Unable to accept connection (deferred): ", ec_a.message ()); + } + } + }); + } + } + } + })); +} + +// This must be called from a strand +void nano::server_socket::evict_dead_connections () +{ + assert (strand.running_in_this_thread ()); + connections.erase (std::remove_if (connections.begin (), connections.end (), [](auto & connection) { return connection.expired (); }), connections.end ()); +} diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp new file mode 100644 index 000000000..daddd1aba --- /dev/null +++ b/nano/node/socket.hpp @@ -0,0 +1,123 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace nano +{ +class node; +class server_socket; + +/** Socket class for tcp clients and newly accepted connections */ +class socket : public std::enable_shared_from_this +{ + friend class server_socket; + +public: + /** + * If multi_writer is used, overlapping writes are allowed, including from multiple threads. + * For bootstrapping, reading and writing alternates on a socket, thus single_writer + * should be used to avoid queueing overhead. For live messages, multiple threads may want + * to concurrenctly queue messages on the same socket, thus multi_writer should be used. + */ + enum class concurrency + { + single_writer, + multi_writer + }; + + /** + * Constructor + * @param node Owning node + * @param max_idle_time If no activity occurs within the idle time, the socket is closed. If not set, the tcp_idle_time config option is used. + * @param concurrency write concurrency + */ + explicit socket (std::shared_ptr node, boost::optional max_idle_time = boost::none, concurrency = concurrency::single_writer); + virtual ~socket (); + void async_connect (boost::asio::ip::tcp::endpoint const &, std::function); + void async_read (std::shared_ptr>, size_t, std::function); + void async_write (std::shared_ptr>, std::function = nullptr); + + void close (); + boost::asio::ip::tcp::endpoint remote_endpoint () const; + /** Returns true if the socket has timed out */ + bool has_timed_out () const; + /** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */ + void set_max_idle_timeout (std::chrono::seconds max_idle_time_a); + /** Change write concurrent */ + void set_writer_concurrency (concurrency writer_concurrency_a); + +protected: + /** Holds the buffer and callback for queued writes */ + class queue_item + { + public: + std::shared_ptr> buffer; + std::function callback; + }; + + boost::asio::strand strand; + boost::asio::ip::tcp::socket tcp_socket; + std::weak_ptr node; + + /** The other end of the connection */ + boost::asio::ip::tcp::endpoint remote; + /** Send queue, protected by always being accessed in the strand */ + std::deque send_queue; + std::atomic writer_concurrency; + + std::atomic next_deadline; + std::atomic last_completion_time; + std::atomic timed_out{ false }; + boost::optional max_idle_time; + + /** Set by close() - completion handlers must check this. This is more reliable than checking + error codes as the OS may have already completed the async operation. */ + std::atomic closed{ false }; + void close_internal (); + void write_queued_messages (); + void start_timer (std::chrono::seconds deadline_a); + void start_timer (); + void stop_timer (); + void checkup (); +}; + +/** Socket class for TCP servers */ +class server_socket final : public socket +{ +public: + /** + * Constructor + * @param node_a Owning node + * @param local_a Address and port to listen on + * @param max_connections_a Maximum number of concurrent connections + * @param concurrency_a Write concurrency for new connections + */ + explicit server_socket (std::shared_ptr node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, concurrency concurrency_a = concurrency::single_writer); + /**Start accepting new connections */ + void start (boost::system::error_code &); + /** Stop accepting new connections */ + void close (); + /** Register callback for new connections. The callback must return true to keep accepting new connections. */ + void on_connection (std::function new_connection, boost::system::error_code const &)>); + uint16_t listening_port () + { + return local.port (); + } + +private: + std::vector> connections; + boost::asio::ip::tcp::acceptor acceptor; + boost::asio::ip::tcp::endpoint local; + boost::asio::steady_timer deferred_accept_timer; + size_t max_inbound_connections; + /** Concurrency setting for new connections */ + concurrency concurrency_new_connections; + void evict_dead_connections (); +}; +} diff --git a/nano/node/stats.cpp b/nano/node/stats.cpp index ff3871638..9ce9fef9e 100644 --- a/nano/node/stats.cpp +++ b/nano/node/stats.cpp @@ -343,6 +343,9 @@ std::string nano::stat::type_to_string (uint32_t key) case nano::stat::type::ledger: res = "ledger"; break; + case nano::stat::type::tcp: + res = "tcp"; + break; case nano::stat::type::udp: res = "udp"; break; @@ -355,7 +358,7 @@ std::string nano::stat::type_to_string (uint32_t key) case nano::stat::type::traffic: res = "traffic"; break; - case nano::stat::type::traffic_bootstrap: + case nano::stat::type::traffic_tcp: res = "traffic_bootstrap"; break; case nano::stat::type::vote: @@ -490,6 +493,12 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::overflow: res = "overflow"; break; + case nano::stat::detail::tcp_accept_success: + res = "accept_success"; + break; + case nano::stat::detail::tcp_accept_failure: + res = "accept_failure"; + break; case nano::stat::detail::unreachable_host: res = "unreachable_host"; break; diff --git a/nano/node/stats.hpp b/nano/node/stats.hpp index 097b001fe..7e14ef88b 100644 --- a/nano/node/stats.hpp +++ b/nano/node/stats.hpp @@ -217,7 +217,7 @@ public: enum class type : uint8_t { traffic, - traffic_bootstrap, + traffic_tcp, error, message, block, @@ -228,6 +228,7 @@ public: http_callback, peering, ipc, + tcp, udp, confirmation_height }; @@ -297,6 +298,10 @@ public: invalid_node_id_handshake_message, outdated_version, + // tcp + tcp_accept_success, + tcp_accept_failure, + // ipc invocations, diff --git a/nano/node/testing.hpp b/nano/node/testing.hpp index 4d004ee10..91beda673 100644 --- a/nano/node/testing.hpp +++ b/nano/node/testing.hpp @@ -2,6 +2,7 @@ #include #include +#include #include namespace nano diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 232829ed3..2de4b31fb 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -2,7 +2,7 @@ #include nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::shared_ptr socket_a) : -node (node_a), +channel (node_a), socket (socket_a) { } @@ -24,15 +24,15 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const & return result; } -void nano::transport::channel_tcp::send_buffer_raw (boost::asio::const_buffer buffer_a, std::function const & callback_a) const +void nano::transport::channel_tcp::send_buffer (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) const { - socket->async_write (buffer_a, callback_a); + socket->async_write (buffer_a, callback (buffer_a, detail_a, callback_a)); } std::function nano::transport::channel_tcp::callback (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) const { // clang-format off - return [ buffer_a, node = std::weak_ptr (node.shared ()), detail_a, callback_a ](boost::system::error_code const & ec, size_t size_a) + return [ buffer_a, node = std::weak_ptr (node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a) { if (auto node_l = node.lock ()) { @@ -40,14 +40,9 @@ std::function nano::transport:: { node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); } - if (!ec) + if (callback_a) { - node_l->stats.add (nano::stat::type::traffic, nano::stat::dir::out, size_a); - node_l->stats.inc (nano::stat::type::message, detail_a, nano::stat::dir::out); - if (callback_a) - { - callback_a (ec, size_a); - } + callback_a (ec, size_a); } } }; diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 345ad9499..675323c9b 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -14,14 +14,13 @@ namespace transport channel_tcp (nano::node &, std::shared_ptr); size_t hash_code () const override; bool operator== (nano::transport::channel const &) const override; - void send_buffer_raw (boost::asio::const_buffer, std::function const &) const override; + void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const override; std::function callback (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const override; std::string to_string () const override; bool operator== (nano::transport::channel_tcp const & other_a) const { return &node == &other_a.node && socket == other_a.socket; } - nano::node & node; std::shared_ptr socket; }; } // namespace transport diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 64f79c4b4..9cde3486a 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -1,4 +1,5 @@ #include +#include #include namespace @@ -56,9 +57,9 @@ nano::endpoint nano::transport::map_endpoint_to_v6 (nano::endpoint const & endpo return endpoint_l; } -void nano::transport::channel::send_buffer (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) const +nano::transport::channel::channel (nano::node & node_a) : +node (node_a) { - send_buffer_raw (boost::asio::buffer (buffer_a->data (), buffer_a->size ()), callback (buffer_a, detail_a, callback_a)); } void nano::transport::channel::send (nano::message const & message_a, std::function const & callback_a) const @@ -66,5 +67,7 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct callback_visitor visitor; message_a.visit (visitor); auto buffer (message_a.to_bytes ()); - send_buffer (buffer, visitor.result, callback_a); + auto detail (visitor.result); + send_buffer (buffer, detail, callback_a); + node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out); } diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 6a812d000..64a56c76a 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -14,14 +14,17 @@ namespace transport class channel { public: + channel (nano::node &); virtual ~channel () = default; virtual size_t hash_code () const = 0; virtual bool operator== (nano::transport::channel const &) const = 0; void send (nano::message const &, std::function const & = nullptr) const; - void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const; - virtual void send_buffer_raw (boost::asio::const_buffer, std::function const &) const = 0; + virtual void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const = 0; virtual std::function callback (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const = 0; virtual std::string to_string () const = 0; + + protected: + nano::node & node; }; } // namespace transport } // namespace nano diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 2b7c5f95d..2a9c70ad4 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -5,6 +5,7 @@ std::chrono::seconds constexpr nano::transport::udp_channels::syn_cookie_cutoff; nano::transport::channel_udp::channel_udp (nano::transport::udp_channels & channels_a, nano::endpoint const & endpoint_a, unsigned network_version_a) : +channel (channels_a.node), network_version (network_version_a), endpoint (endpoint_a), channels (channels_a) @@ -29,15 +30,15 @@ bool nano::transport::channel_udp::operator== (nano::transport::channel const & return result; } -void nano::transport::channel_udp::send_buffer_raw (boost::asio::const_buffer buffer_a, std::function const & callback_a) const +void nano::transport::channel_udp::send_buffer (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) const { - channels.send (buffer_a, endpoint, callback_a); + channels.send (boost::asio::const_buffer (buffer_a->data (), buffer_a->size ()), endpoint, callback (buffer_a, detail_a, callback_a)); } std::function nano::transport::channel_udp::callback (std::shared_ptr> buffer_a, nano::stat::detail detail_a, std::function const & callback_a) const { // clang-format off - return [ buffer_a, node = std::weak_ptr (channels.node.shared ()), detail_a, callback_a ](boost::system::error_code const & ec, size_t size_a) + return [ buffer_a, node = std::weak_ptr (channels.node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a) { if (auto node_l = node.lock ()) { @@ -45,14 +46,14 @@ std::function nano::transport:: { node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out); } - if (!ec) + if (size_a > 0) { node_l->stats.add (nano::stat::type::traffic, nano::stat::dir::out, size_a); - node_l->stats.inc (nano::stat::type::message, detail_a, nano::stat::dir::out); - if (callback_a) - { - callback_a (ec, size_a); - } + } + + if (callback_a) + { + callback_a (ec, size_a); } } }; @@ -75,6 +76,7 @@ socket (node_a.io_ctx, nano::endpoint (boost::asio::ip::address_v6::any (), port { node.logger.try_log ("Unable to retrieve port: ", ec.message ()); } + local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), port); } @@ -390,14 +392,29 @@ void nano::transport::udp_channels::stop () std::lock_guard lock (mutex); local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), 0); + // On test-net, close directly to avoid address-reuse issues. On livenet, close + // through the strand as multiple IO threads may access the socket. // clang-format off - boost::asio::post (strand, [this] { - boost::system::error_code ignored; - this->socket.close (ignored); - }); + if (node.network_params.network.is_test_network ()) + { + this->close_socket (); + } + else + { + boost::asio::dispatch (strand, [this] { + this->close_socket (); + }); + } // clang-format on } +void nano::transport::udp_channels::close_socket () +{ + boost::system::error_code ignored; + this->socket.close (ignored); + this->local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), 0); +} + nano::endpoint nano::transport::udp_channels::get_local_endpoint () const { std::lock_guard lock (mutex); diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 2ab956b87..aef2312f0 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -19,7 +19,7 @@ namespace transport channel_udp (nano::transport::udp_channels &, nano::endpoint const &, unsigned = nano::protocol_version); size_t hash_code () const override; bool operator== (nano::transport::channel const &) const override; - void send_buffer_raw (boost::asio::const_buffer, std::function const &) const override; + void send_buffer (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const override; std::function callback (std::shared_ptr>, nano::stat::detail, std::function const & = nullptr) const override; std::string to_string () const override; bool operator== (nano::transport::channel_udp const & other_a) const @@ -130,6 +130,7 @@ namespace transport static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5); private: + void close_socket (); void ongoing_syn_cookie_cleanup (); class endpoint_tag { diff --git a/nano/qt_system/entry.cpp b/nano/qt_system/entry.cpp index 703e40820..a2463cd2e 100644 --- a/nano/qt_system/entry.cpp +++ b/nano/qt_system/entry.cpp @@ -15,6 +15,7 @@ int main (int argc, char ** argv) nano_qt::eventloop_processor processor; static int count (16); nano::system system (24000, count); + nano::thread_runner runner (system.io_ctx, system.nodes[0]->config.io_threads); std::unique_ptr client_tabs (new QTabWidget); std::vector> guis; for (auto i (0); i < count; ++i) @@ -28,7 +29,6 @@ int main (int argc, char ** argv) client_tabs->addTab (guis.back ()->client_window, boost::str (boost::format ("Wallet %1%") % i).c_str ()); } client_tabs->show (); - nano::thread_runner runner (system.io_ctx, system.nodes[0]->config.io_threads); QObject::connect (&application, &QApplication::aboutToQuit, [&]() { system.stop (); });