From c1eb776bc70346090b4efcdb3654a3cbbc9737e3 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Tue, 10 Mar 2020 19:47:58 +0000 Subject: [PATCH] Telemetry results not correctly using cache timeout (#2650) --- nano/core_test/node_telemetry.cpp | 2 +- nano/node/telemetry.cpp | 109 ++++++++++++++++++++++-------- nano/node/telemetry.hpp | 14 +++- nano/slow_test/node.cpp | 2 +- 4 files changed, 95 insertions(+), 32 deletions(-) diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp index 7e4152eb2..bff5609f4 100644 --- a/nano/core_test/node_telemetry.cpp +++ b/nano/core_test/node_telemetry.cpp @@ -598,7 +598,7 @@ TEST (node_telemetry, all_peers_use_single_request_cache) ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); - std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); + std::this_thread::sleep_for (node_server->telemetry->cache_plus_buffer_cutoff_time ()); // Should be empty responses = node_client->telemetry->get_metrics (); diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index efbd39a41..bb30a9a22 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -13,6 +13,8 @@ #include #include +using namespace std::chrono_literals; + nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, bool disable_ongoing_requests_a) : network (network_a), alarm (alarm_a), @@ -56,34 +58,69 @@ void nano::telemetry::set (nano::telemetry_data const & telemetry_data_a, nano:: } } +std::chrono::milliseconds nano::telemetry::cache_plus_buffer_cutoff_time () const +{ + // This include the waiting time for the response as well as a buffer (1 second) waiting for the alarm operation to be scheduled and completed + return cache_cutoff + response_time_cutoff + 1s; +} + +bool nano::telemetry::within_cache_plus_buffer_cutoff (telemetry_info const & telemetry_info) const +{ + auto is_within = (telemetry_info.last_response + cache_plus_buffer_cutoff_time ()) >= std::chrono::steady_clock::now (); + return !telemetry_info.awaiting_first_response () && is_within; +} + bool nano::telemetry::within_cache_cutoff (telemetry_info const & telemetry_info) const { - auto is_within = (telemetry_info.last_request + nano::telemetry_cache_cutoffs::network_to_time (network_params.network)) >= std::chrono::steady_clock::now (); + auto is_within = (telemetry_info.last_response + cache_cutoff) >= std::chrono::steady_clock::now (); return !telemetry_info.awaiting_first_response () && is_within; } void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_request_interval) { - // Check if any peers actually need requesting alarm.add (std::chrono::steady_clock::now () + next_request_interval, [this_w = std::weak_ptr (shared_from_this ())]() { if (auto this_l = this_w.lock ()) { // Check if there are any peers which are in the peers list which haven't been request, or any which are below or equal to the cache cutoff time if (!this_l->stopped) { - auto peers = this_l->network.list (std::numeric_limits::max (), this_l->network_params.protocol.telemetry_protocol_version_min, false); + class tag_channel + { + }; + + struct channel_wrapper + { + std::shared_ptr channel; + channel_wrapper (std::shared_ptr const & channel_a) : + channel (channel_a) + { + } + nano::endpoint endpoint () const + { + return channel->get_endpoint (); + } + }; + + namespace mi = boost::multi_index; + boost::multi_index_container, + mi::const_mem_fun>, + mi::hashed_unique, + mi::member, &channel_wrapper::channel>>>> + peers; { - std::unordered_set temp_peers; - std::transform (peers.begin (), peers.end (), std::inserter (temp_peers, temp_peers.end ()), [](auto const & channel_a) { - return channel_a->get_endpoint (); - }); + auto temp_peers = this_l->network.list (std::numeric_limits::max (), this_l->network_params.protocol.telemetry_protocol_version_min, false); + peers.insert (temp_peers.begin (), temp_peers.end ()); + } + { // Cleanup any stale saved telemetry data for non-existent peers nano::lock_guard guard (this_l->mutex); for (auto it = this_l->recent_or_initial_request_telemetry_data.begin (); it != this_l->recent_or_initial_request_telemetry_data.end ();) { - if (!it->undergoing_request && !this_l->within_cache_cutoff (*it) && temp_peers.count (it->endpoint) == 0) + if (!it->undergoing_request && !this_l->within_cache_cutoff (*it) && peers.count (it->endpoint) == 0) { it = this_l->recent_or_initial_request_telemetry_data.erase (it); } @@ -93,32 +130,48 @@ void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_requ } } - peers.erase (std::remove_if (peers.begin (), peers.end (), [&this_l](auto const & channel_a) { - // Remove from peers list if it exists and is within the cache cutoff - auto it = this_l->recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ()); - return it != this_l->recent_or_initial_request_telemetry_data.end () && this_l->within_cache_cutoff (*it); - }), - peers.end ()); + // Remove from peers list if it exists and is within the cache cutoff + for (auto peers_it = peers.begin (); peers_it != peers.end ();) + { + auto it = this_l->recent_or_initial_request_telemetry_data.find (peers_it->endpoint ()); + if (it != this_l->recent_or_initial_request_telemetry_data.cend () && this_l->within_cache_cutoff (*it)) + { + peers_it = peers.erase (peers_it); + } + else + { + ++peers_it; + } + } } // Request data from new peers, or ones which are out of date - for (auto const & peer : peers) + for (auto const & peer : boost::make_iterator_range (peers)) { - this_l->get_metrics_single_peer_async (peer, [](auto const &) { + this_l->get_metrics_single_peer_async (peer.channel, [](auto const &) { // Intentionally empty, just using to refresh the cache }); } nano::lock_guard guard (this_l->mutex); - long long next_round = std::chrono::duration_cast (nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network)).count (); + long long next_round = std::chrono::duration_cast (this_l->cache_cutoff + this_l->response_time_cutoff).count (); if (!this_l->recent_or_initial_request_telemetry_data.empty ()) { // Use the default request time unless a telemetry request cache expires sooner - auto const cache_cutoff = nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network); - auto const last_request = this_l->recent_or_initial_request_telemetry_data.get ().begin ()->last_request; - if (std::chrono::steady_clock::now () > last_request + cache_cutoff) + // Find the closest time with doesn't + auto range = boost::make_iterator_range (this_l->recent_or_initial_request_telemetry_data.get ()); + for (auto i : range) { - next_round = std::min (next_round, std::chrono::duration_cast (std::chrono::steady_clock::now () - (last_request + cache_cutoff)).count ()); + if (peers.count (i.endpoint) == 0) + { + auto const last_response = i.last_response; + auto now = std::chrono::steady_clock::now (); + if (now > last_response + this_l->cache_cutoff) + { + next_round = std::min (next_round, std::chrono::duration_cast (now - (last_response + this_l->cache_cutoff)).count ()); + } + break; + } } } @@ -134,10 +187,9 @@ std::unordered_map nano::telemetry::get_me nano::lock_guard guard (mutex); auto range = boost::make_iterator_range (recent_or_initial_request_telemetry_data); - // clang-format off nano::transform_if (range.begin (), range.end (), std::inserter (telemetry_data, telemetry_data.end ()), - [this](auto const & telemetry_info) { return this->within_cache_cutoff (telemetry_info); }, + [this](auto const & telemetry_info) { return this->within_cache_plus_buffer_cutoff (telemetry_info); }, [](auto const & telemetry_info) { return std::pair{ telemetry_info.endpoint, telemetry_info.data }; }); // clang-format on @@ -195,7 +247,6 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptrendpoint].push_back (callback_a); @@ -271,8 +322,12 @@ void nano::telemetry::fire_request_message (std::shared_ptr 0) + auto it = recent_or_initial_request_telemetry_data.find (endpoint_a); + if (it != recent_or_initial_request_telemetry_data.end ()) { + recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) { + telemetry_info_a.last_response = std::chrono::steady_clock::now (); + }); if (error_a) { recent_or_initial_request_telemetry_data.erase (endpoint_a); @@ -328,10 +383,10 @@ size_t nano::telemetry::telemetry_data_size () return recent_or_initial_request_telemetry_data.size (); } -nano::telemetry_info::telemetry_info (nano::endpoint const & endpoint_a, nano::telemetry_data const & data_a, std::chrono::steady_clock::time_point last_request_a, bool undergoing_request_a) : +nano::telemetry_info::telemetry_info (nano::endpoint const & endpoint_a, nano::telemetry_data const & data_a, std::chrono::steady_clock::time_point last_response_a, bool undergoing_request_a) : endpoint (endpoint_a), data (data_a), -last_request (last_request_a), +last_response (last_response_a), undergoing_request (undergoing_request_a) { } diff --git a/nano/node/telemetry.hpp b/nano/node/telemetry.hpp index 1f704af9b..ebf9d4336 100644 --- a/nano/node/telemetry.hpp +++ b/nano/node/telemetry.hpp @@ -38,12 +38,12 @@ class telemetry_info final { public: telemetry_info () = default; - telemetry_info (nano::endpoint const & endpoint, nano::telemetry_data const & data, std::chrono::steady_clock::time_point last_request, bool undergoing_request); + telemetry_info (nano::endpoint const & endpoint, nano::telemetry_data const & data, std::chrono::steady_clock::time_point last_response, bool undergoing_request); bool awaiting_first_response () const; nano::endpoint endpoint; nano::telemetry_data data; - std::chrono::steady_clock::time_point last_request; + std::chrono::steady_clock::time_point last_response; bool undergoing_request{ false }; uint64_t round{ 0 }; }; @@ -87,6 +87,11 @@ public: */ size_t telemetry_data_size (); + /* + * Returns the time for the cache, response and a small buffer for alarm operations to be scheduled and completed + */ + std::chrono::milliseconds cache_plus_buffer_cutoff_time () const; + private: class tag_endpoint { @@ -111,11 +116,13 @@ private: mi::hashed_unique, mi::member>, mi::ordered_non_unique, - mi::member>>> recent_or_initial_request_telemetry_data; + mi::member>>> recent_or_initial_request_telemetry_data; // clang-format on // Anything older than this requires requesting metrics from other nodes. std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) }; + + // The maximum time spent waiting for a response to a telemetry request std::chrono::seconds const response_time_cutoff{ is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3 }; std::unordered_map>> callbacks; @@ -128,6 +135,7 @@ private: void invoke_callbacks (nano::endpoint const &, bool); bool within_cache_cutoff (nano::telemetry_info const &) const; + bool within_cache_plus_buffer_cutoff (telemetry_info const & telemetry_info) const; friend std::unique_ptr collect_container_info (telemetry & telemetry, const std::string & name); }; diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 9ee55a950..d794c1261 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -906,7 +906,7 @@ TEST (node_telemetry, ongoing_requests) // Wait till the next ongoing will be called, and add a 1s buffer for the actual processing auto time = std::chrono::steady_clock::now (); - while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + 1s)) + while (std::chrono::steady_clock::now () < (time + node_client->telemetry->cache_plus_buffer_cutoff_time () + 1s)) { ASSERT_NO_ERROR (system.poll ()); }