From f7b21c2e8aa276c696b0ab24717ca491ad204cf7 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Thu, 9 Apr 2020 20:59:27 +0100 Subject: [PATCH] Improve telemetry request/response under load (#2669) * Improve telemetry request/response under load * Fix broken merge * Extend response time to 10s on beta/live * Some minor cleanup and extra comments * Formatting * Add missed break statement in stats --- nano/core_test/socket.cpp | 74 ++++++++++++++---------- nano/lib/stats.cpp | 16 +++++ nano/lib/stats.hpp | 7 ++- nano/node/bootstrap/bootstrap_server.cpp | 4 ++ nano/node/socket.cpp | 14 ++++- nano/node/telemetry.cpp | 71 +++++++++++++---------- nano/node/telemetry.hpp | 2 +- nano/node/testing.hpp | 2 +- nano/node/transport/transport.cpp | 5 ++ nano/slow_test/node.cpp | 51 ++++++++++++++++ 10 files changed, 183 insertions(+), 63 deletions(-) diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index cc54d11c..bf50fa40 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -15,41 +15,57 @@ TEST (socket, drop_policy) auto node = inactivenode.node; nano::thread_runner runner (node->io_ctx, 1); - auto server_port (nano::get_available_port ()); - boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), server_port); - auto server_socket (std::make_shared (node, endpoint, 1, nano::socket::concurrency::multi_writer)); - boost::system::error_code ec; - server_socket->start (ec); - ASSERT_FALSE (ec); std::vector> connections; - // Accept connection, but don't read so the writer will drop. - server_socket->on_connection ([&connections](std::shared_ptr new_connection, boost::system::error_code const & ec_a) { - connections.push_back (new_connection); - return true; - }); - - auto client (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); - // We're going to write twice the queue size + 1, and the server isn't reading // The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop) - auto total_message_count (client->get_max_write_queue_size () * 2 + 1); - nano::util::counted_completion write_completion (total_message_count - 1); + size_t max_write_queue_size = 0; + { + auto client_dummy (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); + max_write_queue_size = client_dummy->get_max_write_queue_size (); + } - client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_port), - [client, total_message_count, node, &write_completion](boost::system::error_code const & ec_a) { - for (int i = 0; i < total_message_count; i++) - { - std::vector buff (1); - client->async_write ( - nano::shared_const_buffer (std::move (buff)), [&write_completion](boost::system::error_code const & ec, size_t size_a) { - write_completion.increment (); - }, - nano::buffer_drop_policy::no_socket_drop); - } - }); - write_completion.await_count_for (std::chrono::seconds (5)); + auto func = [&](size_t total_message_count, nano::buffer_drop_policy drop_policy) { + auto server_port (nano::get_available_port ()); + boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), server_port); + + auto server_socket (std::make_shared (node, endpoint, 1, nano::socket::concurrency::multi_writer)); + boost::system::error_code ec; + server_socket->start (ec); + ASSERT_FALSE (ec); + + // Accept connection, but don't read so the writer will drop. + server_socket->on_connection ([&connections](std::shared_ptr new_connection, boost::system::error_code const & ec_a) { + connections.push_back (new_connection); + return true; + }); + + auto client (std::make_shared (node, boost::none, nano::socket::concurrency::multi_writer)); + nano::util::counted_completion write_completion (total_message_count - 1); + + client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_port), + [client, total_message_count, node, &write_completion, &drop_policy](boost::system::error_code const & ec_a) { + for (int i = 0; i < total_message_count; i++) + { + std::vector buff (1); + client->async_write ( + nano::shared_const_buffer (std::move (buff)), [&write_completion](boost::system::error_code const & ec, size_t size_a) { + write_completion.increment (); + }, + drop_policy); + } + }); + write_completion.await_count_for (std::chrono::seconds (5)); + }; + + func (max_write_queue_size * 2 + 1, nano::buffer_drop_policy::no_socket_drop); + ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out)); + ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out)); + + func (max_write_queue_size + 1, nano::buffer_drop_policy::limiter); + // The stats are accumulated from before + ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out)); ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out)); node->stop (); diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index c51d5977..8053feac 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -628,8 +628,12 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::tcp_write_drop: res = "tcp_write_drop"; break; + case nano::stat::detail::tcp_write_no_socket_drop: + res = "tcp_write_no_socket_drop"; + break; case nano::stat::detail::tcp_excluded: res = "tcp_excluded"; + break; case nano::stat::detail::unreachable_host: res = "unreachable_host"; break; @@ -714,6 +718,18 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::node_id_mismatch: res = "node_id_mismatch"; break; + case nano::stat::detail::request_within_protection_cache_zone: + res = "request_within_protection_cache_zone"; + break; + case nano::stat::detail::no_response_received: + res = "no_response_received"; + break; + case nano::stat::detail::unsolicited_telemetry_ack: + res = "unsolicited_telemetry_ack"; + break; + case nano::stat::detail::failed_send_telemetry_req: + res = "failed_send_telemetry_req"; + break; } return res; } diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index 2dff45ae..94e634ee 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -294,6 +294,7 @@ public: tcp_accept_success, tcp_accept_failure, tcp_write_drop, + tcp_write_no_socket_drop, tcp_excluded, // ipc @@ -325,7 +326,11 @@ public: // telemetry invalid_signature, different_genesis_hash, - node_id_mismatch + node_id_mismatch, + request_within_protection_cache_zone, + no_response_received, + unsolicited_telemetry_ack, + failed_send_telemetry_req }; /** Direction of the stat. If the direction is irrelevant, use in */ diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 2da045e5..3e1906b7 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -258,6 +258,10 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co last_telemetry_req = std::chrono::steady_clock::now (); add_request (std::make_unique (header)); } + else + { + node->stats.inc (nano::stat::type::telemetry, nano::stat::detail::request_within_protection_cache_zone); + } } receive (); break; diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index 4e52265b..2dd958c1 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -88,7 +88,19 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std: } else if (auto node_l = this_l->node.lock ()) { - node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); + if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop) + { + node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out); + } + else + { + node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out); + } + + if (callback_a) + { + callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0); + } } if (!write_in_progress) { diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index 1015557a..82e2024c 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -52,12 +52,12 @@ void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transpor if (it == recent_or_initial_request_telemetry_data.cend () || !it->undergoing_request) { // Not requesting telemetry data from this peer so ignore it + stats.inc (nano::stat::type::telemetry, nano::stat::detail::unsolicited_telemetry_ack); return; } recent_or_initial_request_telemetry_data.modify (it, [&message_a](nano::telemetry_info & telemetry_info_a) { telemetry_info_a.data = message_a.data; - telemetry_info_a.undergoing_request = false; }); // This can also remove the peer @@ -65,7 +65,7 @@ void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transpor if (!error) { - // Received telemetry data from a peer which hasn't disabled providing telemetry metrics and there no errors with the data + // Received telemetry data from a peer which hasn't disabled providing telemetry metrics and there's no errors with the data lk.unlock (); observers.notify (message_a.data, endpoint); lk.lock (); @@ -163,16 +163,18 @@ void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_requ } }; + // clang-format off namespace mi = boost::multi_index; boost::multi_index_container, - mi::const_mem_fun>, - mi::hashed_unique, - mi::member, &channel_wrapper::channel>>>> - peers; + mi::hashed_unique, + mi::const_mem_fun>, + mi::hashed_unique, + mi::member, &channel_wrapper::channel>>>> peers; + // clang-format on { + // Copy peers to the multi index container so can get better asymptotic complexity in future operations auto temp_peers = this_l->network.list (std::numeric_limits::max (), this_l->network_params.protocol.telemetry_protocol_version_min); peers.insert (temp_peers.begin (), temp_peers.end ()); } @@ -215,23 +217,23 @@ void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_requ }); } + // Schedule the next request; Use the default request time unless a telemetry request cache expires sooner nano::lock_guard guard (this_l->mutex); long long next_round = std::chrono::duration_cast (this_l->cache_cutoff + this_l->response_time_cutoff).count (); if (!this_l->recent_or_initial_request_telemetry_data.empty ()) { - // Use the default request time unless a telemetry request cache expires sooner - // Find the closest time with doesn't auto range = boost::make_iterator_range (this_l->recent_or_initial_request_telemetry_data.get ()); - for (auto i : range) + for (auto telemetry_info : range) { - if (peers.count (i.endpoint) == 0) + if (!telemetry_info.undergoing_request && peers.count (telemetry_info.endpoint) == 0) { - auto const last_response = i.last_response; + auto const last_response = telemetry_info.last_response; auto now = std::chrono::steady_clock::now (); if (now > last_response + this_l->cache_cutoff) { next_round = std::min (next_round, std::chrono::duration_cast (now - (last_response + this_l->cache_cutoff)).count ()); } + // We are iterating in sorted order from last_updated, so can break once we have found the first valid one. break; } } @@ -302,6 +304,7 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptrget_endpoint (), nano::telemetry_data (), std::chrono::steady_clock::now (), true); it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ()); } @@ -354,32 +357,36 @@ void nano::telemetry::fire_request_message (std::shared_ptr this_w (shared_from_this ()); nano::telemetry_req message; // clang-format off - channel_a->send (message, [this_w, endpoint = channel_a->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) { + channel_a->send (message, [this_w, endpoint = channel_a->get_endpoint (), round_l](boost::system::error_code const & ec, size_t size_a) { if (auto this_l = this_w.lock ()) { if (ec) { // Error sending the telemetry_req message + this_l->stats.inc (nano::stat::type::telemetry, nano::stat::detail::failed_send_telemetry_req); nano::lock_guard guard (this_l->mutex); this_l->channel_processed (endpoint, true); } + else + { + // If no response is seen after a certain period of time remove it + this_l->alarm.add (std::chrono::steady_clock::now () + this_l->response_time_cutoff, [round_l, this_w, endpoint]() { + if (auto this_l = this_w.lock ()) + { + nano::lock_guard guard (this_l->mutex); + auto it = this_l->recent_or_initial_request_telemetry_data.find (endpoint); + if (it != this_l->recent_or_initial_request_telemetry_data.cend () && it->undergoing_request && round_l == it->round) + { + this_l->stats.inc (nano::stat::type::telemetry, nano::stat::detail::no_response_received); + this_l->channel_processed (endpoint, true); + } + } + }); + } } }, nano::buffer_drop_policy::no_socket_drop); // clang-format on - - // If no response is seen after a certain period of time remove it - alarm.add (std::chrono::steady_clock::now () + response_time_cutoff, [round_l, this_w, endpoint = channel_a->get_endpoint ()]() { - if (auto this_l = this_w.lock ()) - { - nano::lock_guard guard (this_l->mutex); - auto it = this_l->recent_or_initial_request_telemetry_data.find (endpoint); - if (it != this_l->recent_or_initial_request_telemetry_data.cend () && it->undergoing_request && round_l == it->round) - { - this_l->channel_processed (endpoint, true); - } - } - }); } void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool error_a) @@ -387,10 +394,14 @@ void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool auto it = recent_or_initial_request_telemetry_data.find (endpoint_a); if (it != recent_or_initial_request_telemetry_data.end ()) { - recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) { - telemetry_info_a.last_response = std::chrono::steady_clock::now (); - }); - if (error_a) + if (!error_a) + { + recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) { + telemetry_info_a.last_response = std::chrono::steady_clock::now (); + telemetry_info_a.undergoing_request = false; + }); + } + else { recent_or_initial_request_telemetry_data.erase (endpoint_a); } diff --git a/nano/node/telemetry.hpp b/nano/node/telemetry.hpp index 19ad1174..a7c1d7a2 100644 --- a/nano/node/telemetry.hpp +++ b/nano/node/telemetry.hpp @@ -129,7 +129,7 @@ private: std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) }; // The maximum time spent waiting for a response to a telemetry request - std::chrono::seconds const response_time_cutoff{ is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3 }; + std::chrono::seconds const response_time_cutoff{ network_params.network.is_test_network () ? (is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3) : 10 }; std::unordered_map>> callbacks; diff --git a/nano/node/testing.hpp b/nano/node/testing.hpp index 787dd586..ed170cd2 100644 --- a/nano/node/testing.hpp +++ b/nano/node/testing.hpp @@ -50,7 +50,7 @@ public: nano::alarm alarm{ io_ctx }; std::vector> nodes; nano::logging logging; - nano::work_pool work{ 1 }; + nano::work_pool work{ std::max (std::thread::hardware_concurrency (), 1u) }; std::chrono::time_point> deadline{ std::chrono::steady_clock::time_point::max () }; double deadline_scaling_factor{ 1.0 }; unsigned node_sequence{ 0 }; diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 733c1077..56e46e70 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -100,6 +100,11 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct } else { + if (callback_a) + { + callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0); + } + node.stats.inc (nano::stat::type::drop, detail, nano::stat::dir::out); if (node.config.logging.network_packet_logging ()) { diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index face20e4..644cb7b4 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1071,6 +1071,57 @@ namespace transport } } +TEST (node_telemetry, under_load) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto node = system.add_node (node_config); + node_config.peering_port = nano::get_available_port (); + nano::node_flags node_flags; + node_flags.disable_ongoing_telemetry_requests = true; + auto node1 = system.add_node (node_config, node_flags); + nano::genesis genesis; + nano::keypair key; + nano::keypair key1; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + system.wallet (0)->insert_adhoc (key.prv); + auto latest_genesis = node->latest (nano::test_genesis_key.pub); + auto num_blocks = 150000; + auto send (std::make_shared (nano::test_genesis_key.pub, latest_genesis, nano::test_genesis_key.pub, nano::genesis_amount - num_blocks, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest_genesis))); + node->process_active (send); + latest_genesis = send->hash (); + auto open (std::make_shared (key.pub, 0, key.pub, num_blocks, send->hash (), key.prv, key.pub, *system.work.generate (key.pub))); + node->process_active (open); + auto latest_key = open->hash (); + + auto thread_func = [key1, &system, node, num_blocks](nano::keypair const & keypair, nano::block_hash const & latest, nano::uint128_t const initial_amount) { + auto latest_l = latest; + for (int i = 0; i < num_blocks; ++i) + { + auto send (std::make_shared (keypair.pub, latest_l, keypair.pub, initial_amount - i - 1, key1.pub, keypair.prv, keypair.pub, *system.work.generate (latest_l))); + latest_l = send->hash (); + node->process_active (send); + } + }; + + std::thread thread1 (thread_func, nano::test_genesis_key, latest_genesis, nano::genesis_amount - num_blocks); + std::thread thread2 (thread_func, key, latest_key, num_blocks); + + ASSERT_TIMELY (200s, node1->ledger.cache.block_count == num_blocks * 2 + 3); + + thread1.join (); + thread2.join (); + + for (auto const & node : system.nodes) + { + ASSERT_EQ (0, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::failed_send_telemetry_req)); + ASSERT_EQ (0, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::request_within_protection_cache_zone)); + ASSERT_EQ (0, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::unsolicited_telemetry_ack)); + ASSERT_EQ (0, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::no_response_received)); + } +} + // Similar to signature_checker.boundary_checks but more exhaustive. Can take up to 1 minute TEST (signature_checker, mass_boundary_checks) {