Telemetry results not correctly using cache timeout (#2650)

This commit is contained in:
Wesley Shillingford 2020-03-10 19:47:58 +00:00 committed by GitHub
commit c1eb776bc7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 32 deletions

View file

@ -598,7 +598,7 @@ TEST (node_telemetry, all_peers_use_single_request_cache)
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
std::this_thread::sleep_for (node_server->telemetry->cache_plus_buffer_cutoff_time ());
// Should be empty
responses = node_client->telemetry->get_metrics ();

View file

@ -13,6 +13,8 @@
#include <numeric>
#include <set>
using namespace std::chrono_literals;
nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, bool disable_ongoing_requests_a) :
network (network_a),
alarm (alarm_a),
@ -56,34 +58,69 @@ void nano::telemetry::set (nano::telemetry_data const & telemetry_data_a, nano::
}
}
std::chrono::milliseconds nano::telemetry::cache_plus_buffer_cutoff_time () const
{
// This include the waiting time for the response as well as a buffer (1 second) waiting for the alarm operation to be scheduled and completed
return cache_cutoff + response_time_cutoff + 1s;
}
bool nano::telemetry::within_cache_plus_buffer_cutoff (telemetry_info const & telemetry_info) const
{
auto is_within = (telemetry_info.last_response + cache_plus_buffer_cutoff_time ()) >= std::chrono::steady_clock::now ();
return !telemetry_info.awaiting_first_response () && is_within;
}
bool nano::telemetry::within_cache_cutoff (telemetry_info const & telemetry_info) const
{
auto is_within = (telemetry_info.last_request + nano::telemetry_cache_cutoffs::network_to_time (network_params.network)) >= std::chrono::steady_clock::now ();
auto is_within = (telemetry_info.last_response + cache_cutoff) >= std::chrono::steady_clock::now ();
return !telemetry_info.awaiting_first_response () && is_within;
}
void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_request_interval)
{
// Check if any peers actually need requesting
alarm.add (std::chrono::steady_clock::now () + next_request_interval, [this_w = std::weak_ptr<telemetry> (shared_from_this ())]() {
if (auto this_l = this_w.lock ())
{
// Check if there are any peers which are in the peers list which haven't been request, or any which are below or equal to the cache cutoff time
if (!this_l->stopped)
{
auto peers = this_l->network.list (std::numeric_limits<size_t>::max (), this_l->network_params.protocol.telemetry_protocol_version_min, false);
class tag_channel
{
};
struct channel_wrapper
{
std::shared_ptr<nano::transport::channel> channel;
channel_wrapper (std::shared_ptr<nano::transport::channel> const & channel_a) :
channel (channel_a)
{
}
nano::endpoint endpoint () const
{
return channel->get_endpoint ();
}
};
namespace mi = boost::multi_index;
boost::multi_index_container<channel_wrapper,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_endpoint>,
mi::const_mem_fun<channel_wrapper, nano::endpoint, &channel_wrapper::endpoint>>,
mi::hashed_unique<mi::tag<tag_channel>,
mi::member<channel_wrapper, std::shared_ptr<nano::transport::channel>, &channel_wrapper::channel>>>>
peers;
{
std::unordered_set<nano::endpoint> temp_peers;
std::transform (peers.begin (), peers.end (), std::inserter (temp_peers, temp_peers.end ()), [](auto const & channel_a) {
return channel_a->get_endpoint ();
});
auto temp_peers = this_l->network.list (std::numeric_limits<size_t>::max (), this_l->network_params.protocol.telemetry_protocol_version_min, false);
peers.insert (temp_peers.begin (), temp_peers.end ());
}
{
// Cleanup any stale saved telemetry data for non-existent peers
nano::lock_guard<std::mutex> guard (this_l->mutex);
for (auto it = this_l->recent_or_initial_request_telemetry_data.begin (); it != this_l->recent_or_initial_request_telemetry_data.end ();)
{
if (!it->undergoing_request && !this_l->within_cache_cutoff (*it) && temp_peers.count (it->endpoint) == 0)
if (!it->undergoing_request && !this_l->within_cache_cutoff (*it) && peers.count (it->endpoint) == 0)
{
it = this_l->recent_or_initial_request_telemetry_data.erase (it);
}
@ -93,32 +130,48 @@ void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_requ
}
}
peers.erase (std::remove_if (peers.begin (), peers.end (), [&this_l](auto const & channel_a) {
// Remove from peers list if it exists and is within the cache cutoff
auto it = this_l->recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ());
return it != this_l->recent_or_initial_request_telemetry_data.end () && this_l->within_cache_cutoff (*it);
}),
peers.end ());
// Remove from peers list if it exists and is within the cache cutoff
for (auto peers_it = peers.begin (); peers_it != peers.end ();)
{
auto it = this_l->recent_or_initial_request_telemetry_data.find (peers_it->endpoint ());
if (it != this_l->recent_or_initial_request_telemetry_data.cend () && this_l->within_cache_cutoff (*it))
{
peers_it = peers.erase (peers_it);
}
else
{
++peers_it;
}
}
}
// Request data from new peers, or ones which are out of date
for (auto const & peer : peers)
for (auto const & peer : boost::make_iterator_range (peers))
{
this_l->get_metrics_single_peer_async (peer, [](auto const &) {
this_l->get_metrics_single_peer_async (peer.channel, [](auto const &) {
// Intentionally empty, just using to refresh the cache
});
}
nano::lock_guard<std::mutex> guard (this_l->mutex);
long long next_round = std::chrono::duration_cast<std::chrono::milliseconds> (nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network)).count ();
long long next_round = std::chrono::duration_cast<std::chrono::milliseconds> (this_l->cache_cutoff + this_l->response_time_cutoff).count ();
if (!this_l->recent_or_initial_request_telemetry_data.empty ())
{
// Use the default request time unless a telemetry request cache expires sooner
auto const cache_cutoff = nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network);
auto const last_request = this_l->recent_or_initial_request_telemetry_data.get<tag_last_updated> ().begin ()->last_request;
if (std::chrono::steady_clock::now () > last_request + cache_cutoff)
// Find the closest time with doesn't
auto range = boost::make_iterator_range (this_l->recent_or_initial_request_telemetry_data.get<tag_last_updated> ());
for (auto i : range)
{
next_round = std::min<long long> (next_round, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::steady_clock::now () - (last_request + cache_cutoff)).count ());
if (peers.count (i.endpoint) == 0)
{
auto const last_response = i.last_response;
auto now = std::chrono::steady_clock::now ();
if (now > last_response + this_l->cache_cutoff)
{
next_round = std::min<long long> (next_round, std::chrono::duration_cast<std::chrono::milliseconds> (now - (last_response + this_l->cache_cutoff)).count ());
}
break;
}
}
}
@ -134,10 +187,9 @@ std::unordered_map<nano::endpoint, nano::telemetry_data> nano::telemetry::get_me
nano::lock_guard<std::mutex> guard (mutex);
auto range = boost::make_iterator_range (recent_or_initial_request_telemetry_data);
// clang-format off
nano::transform_if (range.begin (), range.end (), std::inserter (telemetry_data, telemetry_data.end ()),
[this](auto const & telemetry_info) { return this->within_cache_cutoff (telemetry_info); },
[this](auto const & telemetry_info) { return this->within_cache_plus_buffer_cutoff (telemetry_info); },
[](auto const & telemetry_info) { return std::pair<const nano::endpoint, nano::telemetry_data>{ telemetry_info.endpoint, telemetry_info.data }; });
// clang-format on
@ -195,7 +247,6 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
{
recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) {
telemetry_info_a.undergoing_request = true;
telemetry_info_a.last_request = std::chrono::steady_clock::now ();
});
}
callbacks[it->endpoint].push_back (callback_a);
@ -271,8 +322,12 @@ void nano::telemetry::fire_request_message (std::shared_ptr<nano::transport::cha
void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool error_a)
{
if (recent_or_initial_request_telemetry_data.count (endpoint_a) > 0)
auto it = recent_or_initial_request_telemetry_data.find (endpoint_a);
if (it != recent_or_initial_request_telemetry_data.end ())
{
recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) {
telemetry_info_a.last_response = std::chrono::steady_clock::now ();
});
if (error_a)
{
recent_or_initial_request_telemetry_data.erase (endpoint_a);
@ -328,10 +383,10 @@ size_t nano::telemetry::telemetry_data_size ()
return recent_or_initial_request_telemetry_data.size ();
}
nano::telemetry_info::telemetry_info (nano::endpoint const & endpoint_a, nano::telemetry_data const & data_a, std::chrono::steady_clock::time_point last_request_a, bool undergoing_request_a) :
nano::telemetry_info::telemetry_info (nano::endpoint const & endpoint_a, nano::telemetry_data const & data_a, std::chrono::steady_clock::time_point last_response_a, bool undergoing_request_a) :
endpoint (endpoint_a),
data (data_a),
last_request (last_request_a),
last_response (last_response_a),
undergoing_request (undergoing_request_a)
{
}

View file

@ -38,12 +38,12 @@ class telemetry_info final
{
public:
telemetry_info () = default;
telemetry_info (nano::endpoint const & endpoint, nano::telemetry_data const & data, std::chrono::steady_clock::time_point last_request, bool undergoing_request);
telemetry_info (nano::endpoint const & endpoint, nano::telemetry_data const & data, std::chrono::steady_clock::time_point last_response, bool undergoing_request);
bool awaiting_first_response () const;
nano::endpoint endpoint;
nano::telemetry_data data;
std::chrono::steady_clock::time_point last_request;
std::chrono::steady_clock::time_point last_response;
bool undergoing_request{ false };
uint64_t round{ 0 };
};
@ -87,6 +87,11 @@ public:
*/
size_t telemetry_data_size ();
/*
* Returns the time for the cache, response and a small buffer for alarm operations to be scheduled and completed
*/
std::chrono::milliseconds cache_plus_buffer_cutoff_time () const;
private:
class tag_endpoint
{
@ -111,11 +116,13 @@ private:
mi::hashed_unique<mi::tag<tag_endpoint>,
mi::member<nano::telemetry_info, nano::endpoint, &nano::telemetry_info::endpoint>>,
mi::ordered_non_unique<mi::tag<tag_last_updated>,
mi::member<nano::telemetry_info, std::chrono::steady_clock::time_point, &nano::telemetry_info::last_request>>>> recent_or_initial_request_telemetry_data;
mi::member<nano::telemetry_info, std::chrono::steady_clock::time_point, &nano::telemetry_info::last_response>>>> recent_or_initial_request_telemetry_data;
// clang-format on
// Anything older than this requires requesting metrics from other nodes.
std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) };
// The maximum time spent waiting for a response to a telemetry request
std::chrono::seconds const response_time_cutoff{ is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3 };
std::unordered_map<nano::endpoint, std::vector<std::function<void(telemetry_data_response const &)>>> callbacks;
@ -128,6 +135,7 @@ private:
void invoke_callbacks (nano::endpoint const &, bool);
bool within_cache_cutoff (nano::telemetry_info const &) const;
bool within_cache_plus_buffer_cutoff (telemetry_info const & telemetry_info) const;
friend std::unique_ptr<nano::container_info_component> collect_container_info (telemetry & telemetry, const std::string & name);
};

View file

@ -906,7 +906,7 @@ TEST (node_telemetry, ongoing_requests)
// Wait till the next ongoing will be called, and add a 1s buffer for the actual processing
auto time = std::chrono::steady_clock::now ();
while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + 1s))
while (std::chrono::steady_clock::now () < (time + node_client->telemetry->cache_plus_buffer_cutoff_time () + 1s))
{
ASSERT_NO_ERROR (system.poll ());
}