Simplify telemetry data processing (#2598)

* Simplify telemetry data processing

* Stein review comments

* Fix network.replace_port
This commit is contained in:
Wesley Shillingford 2020-02-28 12:50:53 +00:00 committed by GitHub
commit 79fceac1af
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 443 additions and 977 deletions

View file

@ -879,6 +879,7 @@ TEST (network, replace_port)
nano::system system;
nano::node_flags node_flags;
node_flags.disable_udp = false;
node_flags.disable_ongoing_telemetry_requests = true;
auto node0 = system.add_node (node_flags);
ASSERT_EQ (0, node0->network.size ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));

View file

@ -263,38 +263,28 @@ TEST (node_telemetry, no_peers)
nano::system system (1);
std::atomic<bool> done{ false };
system.nodes[0]->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.telemetry_datas.empty ());
ASSERT_FALSE (responses_a.all_received);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
auto responses = system.nodes[0]->telemetry->get_metrics ();
ASSERT_TRUE (responses.empty ());
}
namespace nano
{
TEST (node_telemetry, basic)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_ongoing_telemetry_requests = true;
auto node_client = system.add_node (node_flags);
auto node_server = system.add_node (node_flags);
auto node_client = system.add_node ();
auto node_server = system.add_node ();
wait_peer_connections (system);
// Request telemetry metrics
std::unordered_map<nano::endpoint, nano::telemetry_data> all_telemetry_datas;
nano::telemetry_data telemetry_data;
auto server_endpoint = node_server->network.endpoint ();
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
all_telemetry_datas = responses_a.telemetry_datas;
node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &server_endpoint, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_EQ (server_endpoint, response_a.endpoint);
telemetry_data = response_a.telemetry_data;
done = true;
});
@ -306,15 +296,14 @@ TEST (node_telemetry, basic)
}
// Check the metrics are correct
ASSERT_EQ (all_telemetry_datas.size (), 1);
compare_default_test_result_data (all_telemetry_datas.begin ()->second, *node_server);
compare_default_test_result_data (telemetry_data, *node_server);
// Call again straight away. It should use the cache
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_EQ (all_telemetry_datas, responses_a.telemetry_datas);
ASSERT_TRUE (responses_a.all_received);
node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (telemetry_data, response_a.telemetry_data);
ASSERT_FALSE (response_a.error);
done = true;
});
@ -329,9 +318,9 @@ TEST (node_telemetry, basic)
std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_NE (all_telemetry_datas, responses_a.telemetry_datas);
ASSERT_TRUE (responses_a.all_received);
node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_NE (telemetry_data, response_a.telemetry_data);
ASSERT_FALSE (response_a.error);
done = true;
});
@ -341,19 +330,20 @@ TEST (node_telemetry, basic)
ASSERT_NO_ERROR (system.poll ());
}
}
}
TEST (node_telemetry, many_nodes)
{
nano::system system;
// The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number.
const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10;
const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10; //3; // 10;
nano::node_flags node_flags;
node_flags.disable_ongoing_telemetry_requests = true;
for (auto i = 0; i < num_nodes; ++i)
{
nano::node_config node_config (nano::get_available_port (), system.logging);
// Make a metric completely different for each node so we can check afterwards that there are no duplicates
node_config.bandwidth_limit = 100000 + i;
system.add_node (node_config);
system.add_node (node_config, node_flags);
}
wait_peer_connections (system);
@ -371,25 +361,32 @@ TEST (node_telemetry, many_nodes)
// This is the node which will request metrics from all other nodes
auto node_client = system.nodes.front ();
std::atomic<bool> done{ false };
std::unordered_map<nano::endpoint, nano::telemetry_data> all_telemetry_datas;
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
all_telemetry_datas = responses_a.telemetry_datas;
done = true;
});
std::mutex mutex;
std::vector<nano::telemetry_data> telemetry_datas;
auto peers = node_client->network.list (num_nodes - 1);
ASSERT_EQ (peers.size (), num_nodes - 1);
for (auto const & peer : peers)
{
node_client->telemetry->get_metrics_single_peer_async (peer, [&telemetry_datas, &mutex](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
nano::lock_guard<std::mutex> guard (mutex);
telemetry_datas.push_back (response_a.telemetry_data);
});
}
system.deadline_set (20s);
while (!done)
nano::unique_lock<std::mutex> lk (mutex);
while (telemetry_datas.size () != num_nodes - 1)
{
lk.unlock ();
ASSERT_NO_ERROR (system.poll ());
lk.lock ();
}
// Check the metrics
nano::network_params params;
for (auto & telemetry_data : all_telemetry_datas)
for (auto & data : telemetry_datas)
{
auto & data = telemetry_data.second;
ASSERT_EQ (data.unchecked_count, 0);
ASSERT_EQ (data.cemented_count, 1);
ASSERT_LE (data.peer_count, 9);
@ -408,10 +405,10 @@ TEST (node_telemetry, many_nodes)
}
// We gave some nodes different bandwidth caps, confirm they are not all the same
auto bandwidth_cap = all_telemetry_datas.begin ()->second.bandwidth_cap;
all_telemetry_datas.erase (all_telemetry_datas.begin ());
auto all_bandwidth_limits_same = std::all_of (all_telemetry_datas.begin (), all_telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) {
return telemetry_data.second.bandwidth_cap == bandwidth_cap;
auto bandwidth_cap = telemetry_datas.front ().bandwidth_cap;
telemetry_datas.erase (telemetry_datas.begin ());
auto all_bandwidth_limits_same = std::all_of (telemetry_datas.begin (), telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) {
return telemetry_data.bandwidth_cap == bandwidth_cap;
});
ASSERT_FALSE (all_bandwidth_limits_same);
}
@ -423,7 +420,7 @@ TEST (node_telemetry, receive_from_non_listening_channel)
nano::telemetry_ack message (nano::telemetry_data{});
node->network.process_message (message, node->network.udp_channels.create (node->network.endpoint ()));
// We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it.
ASSERT_EQ (node->telemetry.telemetry_data_size (), 0);
ASSERT_EQ (node->telemetry->telemetry_data_size (), 0);
}
TEST (node_telemetry, over_udp)
@ -438,10 +435,10 @@ TEST (node_telemetry, over_udp)
wait_peer_connections (system);
std::atomic<bool> done{ false };
std::unordered_map<nano::endpoint, nano::telemetry_data> all_telemetry_datas;
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
all_telemetry_datas = responses_a.telemetry_datas;
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &node_server](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
compare_default_test_result_data (response_a.telemetry_data, *node_server);
done = true;
});
@ -451,9 +448,6 @@ TEST (node_telemetry, over_udp)
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (all_telemetry_datas.size (), 1);
compare_default_test_result_data (all_telemetry_datas.begin ()->second, *node_server);
// Check channels are indeed udp
ASSERT_EQ (1, node_client->network.size ());
auto list1 (node_client->network.list (2));
@ -465,77 +459,7 @@ TEST (node_telemetry, over_udp)
ASSERT_EQ (nano::transport::transport_type::udp, list2[0]->get_type ());
}
namespace nano
{
TEST (node_telemetry, single_request)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_ongoing_telemetry_requests = true;
auto node_client = system.add_node (node_flags);
auto node_server = system.add_node (node_flags);
wait_peer_connections (system);
// Request telemetry metrics
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
nano::telemetry_data telemetry_data;
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data, &channel](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_EQ (channel->get_endpoint (), response_a.endpoint);
telemetry_data = response_a.telemetry_data;
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
}
// Check the metrics are correct
compare_default_test_result_data (telemetry_data, *node_server);
// Call again straight away. It should use the cache
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (telemetry_data, response_a.telemetry_data);
ASSERT_FALSE (response_a.error);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
}
// Wait the cache period and check cache is not used
std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_NE (telemetry_data, response_a.telemetry_data);
ASSERT_FALSE (response_a.error);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
}
}
TEST (node_telemetry, single_request_invalid_channel)
TEST (node_telemetry, invalid_channel)
{
nano::system system (2);
@ -543,7 +467,7 @@ TEST (node_telemetry, single_request_invalid_channel)
auto node_server = system.nodes.back ();
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (nullptr, [&done](nano::telemetry_data_response const & response_a) {
node_client->telemetry->get_metrics_single_peer_async (nullptr, [&done](nano::telemetry_data_response const & response_a) {
ASSERT_TRUE (response_a.error);
done = true;
});
@ -555,7 +479,7 @@ TEST (node_telemetry, single_request_invalid_channel)
}
}
TEST (node_telemetry, blocking_single_and_random)
TEST (node_telemetry, blocking_request)
{
nano::system system (2);
@ -584,104 +508,15 @@ TEST (node_telemetry, blocking_single_and_random)
system.deadline_set (10s);
node_client->worker.push_task (call_system_poll);
// Blocking version of get_random_metrics_async
auto telemetry_data_responses = node_client->telemetry.get_metrics_peers ();
ASSERT_TRUE (telemetry_data_responses.all_received);
compare_default_test_result_data (telemetry_data_responses.telemetry_datas.begin ()->second, *node_server);
// Now try single request metric
auto telemetry_data_response = node_client->telemetry.get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ()));
auto telemetry_data_response = node_client->telemetry->get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ()));
ASSERT_FALSE (telemetry_data_response.error);
compare_default_test_result_data (telemetry_data_response.telemetry_data, *node_server);
ASSERT_EQ (*telemetry_data_response.telemetry_data.timestamp, *telemetry_data_responses.telemetry_datas.begin ()->second.timestamp);
done = true;
promise.get_future ().wait ();
}
namespace nano
{
TEST (node_telemetry, multiple_single_request_clearing)
{
nano::system system (2);
auto node_client = system.nodes.front ();
auto node_server = system.nodes.back ();
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.bandwidth_limit = 100000;
auto node_server1 = system.add_node (node_config);
wait_peer_connections (system);
// Request telemetry metrics
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
std::atomic<bool> done{ false };
std::chrono::system_clock::time_point last_updated;
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
last_updated = *response_a.telemetry_data.timestamp;
done = true;
});
ASSERT_EQ (1, node_client->telemetry.single_requests.size ());
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
done = false;
// Make another request to keep the time updated
system.deadline_set (10s);
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_EQ (last_updated, *response_a.telemetry_data.timestamp);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
done = false;
auto channel1 = node_client->network.find_channel (node_server1->network.endpoint ());
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, &last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_NE (last_updated, *response_a.telemetry_data.timestamp);
last_updated = *response_a.telemetry_data.timestamp;
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
done = false;
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_EQ (last_updated, *response_a.telemetry_data.timestamp);
done = true;
});
// single_requests should be removed as no more calls are being back
system.deadline_set (10s);
nano::unique_lock<std::mutex> lk (node_client->telemetry.mutex);
while (!node_client->telemetry.single_requests.empty () || !done)
{
lk.unlock ();
ASSERT_NO_ERROR (system.poll ());
lk.lock ();
}
}
}
TEST (node_telemetry, disconnects)
{
nano::system system (2);
@ -697,19 +532,7 @@ TEST (node_telemetry, disconnects)
ASSERT_TRUE (channel);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_FALSE (responses_a.all_received);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
done = false;
node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) {
node_client->telemetry->get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) {
ASSERT_TRUE (response_a.error);
done = true;
});
@ -721,7 +544,7 @@ TEST (node_telemetry, disconnects)
}
}
TEST (node_telemetry, batch_use_single_request_cache)
TEST (node_telemetry, all_peers_use_single_request_cache)
{
nano::system system;
nano::node_flags node_flags;
@ -736,7 +559,7 @@ TEST (node_telemetry, batch_use_single_request_cache)
{
std::atomic<bool> done{ false };
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
telemetry_data = response_a.telemetry_data;
done = true;
});
@ -748,11 +571,28 @@ TEST (node_telemetry, batch_use_single_request_cache)
}
}
auto responses = node_client->telemetry->get_metrics ();
ASSERT_EQ (telemetry_data, responses.begin ()->second);
// Confirm only 1 request was made
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
// Should be empty
responses = node_client->telemetry->get_metrics ();
ASSERT_TRUE (responses.empty ());
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
ASSERT_EQ (telemetry_data, responses_a.telemetry_datas.begin ()->second);
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
telemetry_data = response_a.telemetry_data;
done = true;
});
@ -763,37 +603,9 @@ TEST (node_telemetry, batch_use_single_request_cache)
}
}
// Confirm only 1 request was made
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
responses = node_client->telemetry->get_metrics ();
ASSERT_EQ (telemetry_data, responses.begin ()->second);
// Wait until there is something pending
system.deadline_set (10s);
while (node_client->telemetry.finished_single_requests_size () == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
system.deadline_set (10s);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_EQ (1, responses_a.telemetry_datas.size ());
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (0, node_client->telemetry.finished_single_requests_size ());
ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
@ -802,56 +614,6 @@ TEST (node_telemetry, batch_use_single_request_cache)
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
}
TEST (node_telemetry, single_request_use_batch_cache)
{
nano::system system (2);
auto node_client = system.nodes.front ();
auto node_server = system.nodes.back ();
wait_peer_connections (system);
// Request batched metric first
std::unordered_map<nano::endpoint, nano::telemetry_data> all_telemetry_datas;
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
ASSERT_EQ (1, responses_a.telemetry_datas.size ());
all_telemetry_datas = responses_a.telemetry_datas;
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
}
std::atomic<bool> done{ false };
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &all_telemetry_datas](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (all_telemetry_datas.begin ()->second, response_a.telemetry_data);
ASSERT_FALSE (response_a.error);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
// Confirm only 1 request was made
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
}
TEST (node_telemetry, dos_tcp)
{
// Confirm that telemetry_reqs are not processed
@ -947,7 +709,7 @@ TEST (node_telemetry, dos_udp)
}
}
TEST (node_telemetry, disable_metrics_single)
TEST (node_telemetry, disable_metrics)
{
nano::system system (1);
auto node_client = system.nodes.front ();
@ -962,7 +724,7 @@ TEST (node_telemetry, disable_metrics_single)
ASSERT_TRUE (channel);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) {
node_client->telemetry->get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) {
ASSERT_TRUE (response_a.error);
done = true;
});
@ -976,7 +738,7 @@ TEST (node_telemetry, disable_metrics_single)
// It should still be able to receive metrics though
done = false;
auto channel1 = node_server->network.find_channel (node_client->network.endpoint ());
node_server->telemetry.get_metrics_single_peer_async (channel1, [&done, node_server](nano::telemetry_data_response const & response_a) {
node_server->telemetry->get_metrics_single_peer_async (channel1, [&done, node_server](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
compare_default_test_result_data (response_a.telemetry_data, *node_server);
done = true;
@ -989,47 +751,6 @@ TEST (node_telemetry, disable_metrics_single)
}
}
TEST (node_telemetry, disable_metrics_batch)
{
nano::system system (1);
auto node_client = system.nodes.front ();
nano::node_flags node_flags;
node_flags.disable_providing_telemetry_metrics = true;
auto node_server = system.add_node (node_flags);
wait_peer_connections (system);
// Try and request metrics from a node which is turned off but a channel is not closed yet
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
ASSERT_TRUE (channel);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_FALSE (responses_a.all_received);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
// It should still be able to receive metrics though
done = false;
node_server->telemetry.get_metrics_peers_async ([&done, node_server](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
compare_default_test_result_data (responses_a.telemetry_datas.begin ()->second, *node_server);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
}
namespace
{
void wait_peer_connections (nano::system & system_a)

View file

@ -187,9 +187,14 @@ namespace util
return val;
}
void increment_required_count ()
{
++required_count;
}
private:
std::atomic<unsigned> count{ 0 };
unsigned required_count;
std::atomic<unsigned> required_count;
};
}

View file

@ -3977,29 +3977,36 @@ void nano::json_handler::telemetry ()
if (!ec)
{
debug_assert (channel);
node.telemetry.get_metrics_single_peer_async (channel, [rpc_l](auto const & telemetry_response_a) {
if (!telemetry_response_a.error)
{
nano::jsonconfig config_l;
auto err = telemetry_response_a.telemetry_data.serialize_json (config_l);
auto const & ptree = config_l.get_tree ();
if (!err)
if (node.telemetry)
{
node.telemetry->get_metrics_single_peer_async (channel, [rpc_l](auto const & telemetry_response_a) {
if (!telemetry_response_a.error)
{
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
nano::jsonconfig config_l;
auto err = telemetry_response_a.telemetry_data.serialize_json (config_l);
auto const & ptree = config_l.get_tree ();
if (!err)
{
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
}
else
{
rpc_l->ec = nano::error_rpc::generic;
}
}
else
{
rpc_l->ec = nano::error_rpc::generic;
}
}
else
{
rpc_l->ec = nano::error_rpc::generic;
}
rpc_l->response_errors ();
});
rpc_l->response_errors ();
});
}
else
{
response_errors ();
}
}
else
{
@ -4012,11 +4019,13 @@ void nano::json_handler::telemetry ()
// setting "raw" to true returns metrics from all nodes requested.
auto raw = request.get_optional<bool> ("raw");
auto output_raw = raw.value_or (false);
node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](telemetry_data_responses const & telemetry_responses_a) {
if (node.telemetry)
{
auto telemetry_responses = node.telemetry->get_metrics ();
if (output_raw)
{
boost::property_tree::ptree metrics;
for (auto & telemetry_metrics : telemetry_responses_a.telemetry_datas)
for (auto & telemetry_metrics : telemetry_responses)
{
nano::jsonconfig config_l;
auto err = telemetry_metrics.second.serialize_json (config_l);
@ -4028,18 +4037,18 @@ void nano::json_handler::telemetry ()
}
else
{
rpc_l->ec = nano::error_rpc::generic;
ec = nano::error_rpc::generic;
}
}
rpc_l->response_l.put_child ("metrics", metrics);
response_l.put_child ("metrics", metrics);
}
else
{
nano::jsonconfig config_l;
std::vector<nano::telemetry_data> telemetry_datas;
telemetry_datas.reserve (telemetry_responses_a.telemetry_datas.size ());
std::transform (telemetry_responses_a.telemetry_datas.begin (), telemetry_responses_a.telemetry_datas.end (), std::back_inserter (telemetry_datas), [](auto const & endpoint_telemetry_data) {
telemetry_datas.reserve (telemetry_responses.size ());
std::transform (telemetry_responses.begin (), telemetry_responses.end (), std::back_inserter (telemetry_datas), [](auto const & endpoint_telemetry_data) {
return endpoint_telemetry_data.second;
});
@ -4049,16 +4058,16 @@ void nano::json_handler::telemetry ()
if (!err)
{
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
response_l.insert (response_l.begin (), ptree.begin (), ptree.end ());
}
else
{
rpc_l->ec = nano::error_rpc::generic;
ec = nano::error_rpc::generic;
}
}
}
rpc_l->response_errors ();
});
response_errors ();
}
}

View file

@ -489,7 +489,10 @@ public:
node.logger.try_log (boost::str (boost::format ("Received telemetry_ack message from %1%") % channel->to_string ()));
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in);
node.telemetry.add (message_a.data, channel->get_endpoint (), message_a.is_empty_payload ());
if (node.telemetry)
{
node.telemetry->set (message_a.data, channel->get_endpoint (), message_a.is_empty_payload ());
}
}
nano::node & node;
std::shared_ptr<nano::transport::channel> channel;

View file

@ -126,7 +126,7 @@ gap_cache (*this),
ledger (store, stats, flags_a.generate_cache),
checker (config.signature_checker_threads),
network (*this, config.peering_port),
telemetry (network, alarm, worker, flags.disable_ongoing_telemetry_requests),
telemetry (std::make_shared<nano::telemetry> (network, alarm, worker, flags.disable_ongoing_telemetry_requests)),
bootstrap_initiator (*this),
bootstrap (config.peering_port, *this),
application_path (application_path_a),
@ -153,6 +153,8 @@ startup_time (std::chrono::steady_clock::now ())
{
if (!init_error ())
{
telemetry->start ();
if (config.websocket_config.enabled)
{
auto endpoint_l (nano::tcp_endpoint (boost::asio::ip::make_address_v6 (config.websocket_config.address), config.websocket_config.port));
@ -586,7 +588,10 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator"));
composite->add_component (collect_container_info (node.bootstrap, "bootstrap"));
composite->add_component (collect_container_info (node.network, "network"));
composite->add_component (collect_container_info (node.telemetry, "telemetry"));
if (node.telemetry)
{
composite->add_component (collect_container_info (*node.telemetry, "telemetry"));
}
composite->add_component (collect_container_info (node.observers, "observers"));
composite->add_component (collect_container_info (node.wallets, "wallets"));
composite->add_component (collect_container_info (node.vote_processor, "vote_processor"));
@ -699,7 +704,11 @@ void nano::node::stop ()
active.stop ();
confirmation_height_processor.stop ();
network.stop ();
telemetry.stop ();
if (telemetry)
{
telemetry->stop ();
telemetry = nullptr;
}
if (websocket_server)
{
websocket_server->stop ();

View file

@ -167,7 +167,7 @@ public:
nano::ledger ledger;
nano::signature_checker checker;
nano::network network;
nano::telemetry telemetry;
std::shared_ptr<nano::telemetry> telemetry;
nano::bootstrap_initiator bootstrap_initiator;
nano::bootstrap_listener bootstrap;
boost::filesystem::path application_path;

View file

@ -17,172 +17,131 @@ nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, na
network (network_a),
alarm (alarm_a),
worker (worker_a),
batch_request (std::make_shared<nano::telemetry_impl> (network, alarm, worker))
disable_ongoing_requests (disable_ongoing_requests_a)
{
// Before callbacks are called with the batch request, check if any of the single request data can be appended to give
batch_request->pre_callback_callback = [this](std::unordered_map<nano::endpoint, telemetry_data> & datas_a, std::mutex & mutex_a) {
nano::lock_guard<std::mutex> guard (this->mutex);
for (auto & single_request : single_requests)
{
nano::lock_guard<std::mutex> guard (single_request.second.impl->mutex);
if (!single_request.second.impl->cached_telemetry_data.empty ())
{
nano::lock_guard<std::mutex> batch_request_guard (mutex_a);
auto it = this->batch_request->cached_telemetry_data.find (single_request.first);
if (it != this->batch_request->cached_telemetry_data.cend () && single_request.second.last_updated > it->second.last_updated)
{
it->second = single_request.second.impl->cached_telemetry_data.begin ()->second;
}
else
{
datas_a.emplace (single_request.first, single_request.second.impl->cached_telemetry_data.begin ()->second.data);
}
}
}
}
for (auto & pending : finished_single_requests)
{
nano::lock_guard<std::mutex> batch_request_guard (mutex_a);
auto it = this->batch_request->cached_telemetry_data.find (pending.first);
if (it != this->batch_request->cached_telemetry_data.cend () && pending.second.last_updated > it->second.last_updated)
{
it->second = pending.second;
}
else
{
datas_a.emplace (pending.first, pending.second.data);
}
}
finished_single_requests.clear ();
};
if (!disable_ongoing_requests_a)
void nano::telemetry::start ()
{
// Cannot be done in the constructor as a shared_from_this () call is made in ongoing_req_all_peers
if (!disable_ongoing_requests)
{
ongoing_req_all_peers ();
ongoing_req_all_peers (std::chrono::milliseconds (0));
}
}
void nano::telemetry::stop ()
{
nano::lock_guard<std::mutex> guard (mutex);
batch_request = nullptr;
single_requests.clear ();
stopped = true;
}
void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a)
void nano::telemetry::set (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a)
{
nano::lock_guard<std::mutex> guard (mutex);
if (!stopped)
{
batch_request->add (telemetry_data_a, endpoint_a, is_empty_a);
for (auto & request : single_requests)
nano::lock_guard<std::mutex> guard (mutex);
auto it = recent_or_initial_request_telemetry_data.find (endpoint_a);
if (it == recent_or_initial_request_telemetry_data.cend ())
{
request.second.impl->add (telemetry_data_a, endpoint_a, is_empty_a);
// Not requesting telemetry data from this peer so ignore it
return;
}
recent_or_initial_request_telemetry_data.modify (it, [&telemetry_data_a](nano::telemetry_info & telemetry_info_a) {
telemetry_info_a.data = telemetry_data_a;
telemetry_info_a.undergoing_request = false;
});
channel_processed (endpoint_a, is_empty_a);
}
}
void nano::telemetry::ongoing_req_all_peers ()
bool nano::telemetry::within_cache_cutoff (telemetry_info const & telemetry_info) const
{
alarm.add (std::chrono::steady_clock::now () + batch_request->cache_cutoff + batch_request->alarm_cutoff, [this, telemetry_impl_w = std::weak_ptr<nano::telemetry_impl> (batch_request)]() {
if (auto batch_telemetry_impl = telemetry_impl_w.lock ())
auto is_within = (telemetry_info.last_request + nano::telemetry_cache_cutoffs::network_to_time (network_params.network)) >= std::chrono::steady_clock::now ();
return !telemetry_info.awaiting_first_response () && is_within;
}
void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_request_interval)
{
// Check if any peers actually need requesting
alarm.add (std::chrono::steady_clock::now () + next_request_interval, [this_w = std::weak_ptr<telemetry> (shared_from_this ())]() {
if (auto this_l = this_w.lock ())
{
nano::lock_guard<std::mutex> guard (this->mutex);
if (!this->stopped)
// Check if there are any peers which are in the peers list which haven't been request, or any which are below or equal to the cache cutoff time
if (!this_l->stopped)
{
auto peers = this->network.list (std::numeric_limits<size_t>::max (), network_params.protocol.telemetry_protocol_version_min, false);
// If exists in single_requests don't request because they will just be rejected by other peers until the next round
auto const & single_requests = this->single_requests;
peers.erase (std::remove_if (peers.begin (), peers.end (), [&single_requests](auto const & channel_a) {
return single_requests.count (channel_a->get_endpoint ()) > 0;
}),
peers.cend ());
if (!peers.empty ())
auto peers = this_l->network.list (std::numeric_limits<size_t>::max (), this_l->network_params.protocol.telemetry_protocol_version_min, false);
{
batch_telemetry_impl->get_metrics_async (peers, [](nano::telemetry_data_responses const &) {
std::unordered_set<nano::endpoint> temp_peers;
std::transform (peers.begin (), peers.end (), std::inserter (temp_peers, temp_peers.end ()), [](auto const & channel_a) {
return channel_a->get_endpoint ();
});
// Cleanup any stale saved telemetry data for non-existent peers
nano::lock_guard<std::mutex> guard (this_l->mutex);
for (auto it = this_l->recent_or_initial_request_telemetry_data.begin (); it != this_l->recent_or_initial_request_telemetry_data.end ();)
{
if (!it->undergoing_request && !this_l->within_cache_cutoff (*it) && temp_peers.count (it->endpoint) == 0)
{
it = this_l->recent_or_initial_request_telemetry_data.erase (it);
}
else
{
++it;
}
}
peers.erase (std::remove_if (peers.begin (), peers.end (), [&this_l](auto const & channel_a) {
// Remove from peers list if it exists and is within the cache cutoff
auto it = this_l->recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ());
return it != this_l->recent_or_initial_request_telemetry_data.end () && this_l->within_cache_cutoff (*it);
}),
peers.end ());
}
// Request data from new peers, or ones which are out of date
for (auto const & peer : peers)
{
this_l->get_metrics_single_peer_async (peer, [](auto const &) {
// Intentionally empty, just using to refresh the cache
});
}
this->ongoing_req_all_peers ();
}
}
});
}
void nano::telemetry::get_metrics_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a)
{
auto peers = network.list (std::numeric_limits<size_t>::max (), network_params.protocol.telemetry_protocol_version_min, false);
nano::lock_guard<std::mutex> guard (mutex);
if (!stopped && !peers.empty ())
{
// If exists in single_requests, don't request because they will just be rejected by other nodes, instead all it as additional values
peers.erase (std::remove_if (peers.begin (), peers.end (), [& single_requests = this->single_requests](auto const & channel_a) {
return single_requests.count (channel_a->get_endpoint ()) > 0;
}),
peers.cend ());
batch_request->get_metrics_async (peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) {
callback_a (telemetry_data_responses);
});
}
else
{
const auto all_received = false;
callback_a (nano::telemetry_data_responses{ {}, all_received });
}
}
nano::telemetry_data_responses nano::telemetry::get_metrics_peers ()
{
std::promise<telemetry_data_responses> promise;
get_metrics_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) {
promise.set_value (telemetry_data_responses_a);
});
return promise.get_future ().get ();
}
// After a request is made to a single peer we want to remove it from the container after the peer has not been requested for a while (cache_cutoff).
void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a)
{
alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, [this, telemetry_impl_w = std::weak_ptr<nano::telemetry_impl> (single_request_data_a.impl), &single_request_data_a, &endpoint_a]() {
if (auto telemetry_impl = telemetry_impl_w.lock ())
{
nano::lock_guard<std::mutex> guard (this->mutex);
nano::lock_guard<std::mutex> guard_telemetry_impl (telemetry_impl->mutex);
if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > single_request_data_a.last_updated && telemetry_impl->callbacks.empty ())
{
// This will be picked up by the batch request next round
if (!telemetry_impl->cached_telemetry_data.empty ())
nano::lock_guard<std::mutex> guard (this_l->mutex);
long long next_round = std::chrono::duration_cast<std::chrono::milliseconds> (nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network)).count ();
if (!this_l->recent_or_initial_request_telemetry_data.empty ())
{
this->finished_single_requests[endpoint_a] = telemetry_impl->cached_telemetry_data.begin ()->second;
// Use the default request time unless a telemetry request cache expires sooner
auto const cache_cutoff = nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network);
auto const last_request = this_l->recent_or_initial_request_telemetry_data.get<tag_last_updated> ().begin ()->last_request;
if (std::chrono::steady_clock::now () > last_request + cache_cutoff)
{
next_round = std::min<long long> (next_round, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::steady_clock::now () - (last_request + cache_cutoff)).count ());
}
}
this->single_requests.erase (endpoint_a);
}
else
{
// Request is still active, so call again
this->ongoing_single_request_cleanup (endpoint_a, single_request_data_a);
this_l->ongoing_req_all_peers (std::chrono::milliseconds (next_round));
}
}
});
}
void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a)
std::unordered_map<nano::endpoint, nano::telemetry_data> nano::telemetry::get_metrics ()
{
if (is_new_a)
{
// Clean this request up when it isn't being used anymore
ongoing_single_request_cleanup (endpoint_a, single_request_data_a);
}
else
{
// Ensure that refreshed flag is reset so we don't delete it before processing
single_request_data_a.last_updated = std::chrono::steady_clock::now ();
}
std::unordered_map<nano::endpoint, nano::telemetry_data> telemetry_data;
nano::lock_guard<std::mutex> guard (mutex);
auto range = boost::make_iterator_range (recent_or_initial_request_telemetry_data);
// clang-format off
nano::transform_if (range.begin (), range.end (), std::inserter (telemetry_data, telemetry_data.end ()),
[this](auto const & telemetry_info) { return this->within_cache_cutoff (telemetry_info); },
[](auto const & telemetry_info) { return std::pair<const nano::endpoint, nano::telemetry_data>{ telemetry_info.endpoint, telemetry_info.data }; });
// clang-format on
return telemetry_data;
}
void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::transport::channel> const & channel_a, std::function<void(telemetry_data_response const &)> const & callback_a)
@ -199,7 +158,6 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
});
};
nano::lock_guard<std::mutex> guard (mutex);
if (!stopped)
{
if (channel_a && (channel_a->get_network_version () >= network_params.protocol.telemetry_protocol_version_min))
@ -211,42 +169,39 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
});
};
// First check if the batched metrics have processed this endpoint.
// Check if this is within the cache
nano::lock_guard<std::mutex> guard (mutex);
auto it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ());
if (it != recent_or_initial_request_telemetry_data.cend () && within_cache_cutoff (*it))
{
nano::lock_guard<std::mutex> guard (batch_request->mutex);
auto it = batch_request->cached_telemetry_data.find (channel_a->get_endpoint ());
if (it != batch_request->cached_telemetry_data.cend ())
{
add_callback_async (it->second.data, it->first);
return;
}
add_callback_async (it->data, it->endpoint);
}
// Next check single requests which finished and are awaiting batched requests
auto it = finished_single_requests.find (channel_a->get_endpoint ());
if (it != finished_single_requests.cend ())
else
{
add_callback_async (it->second.data, it->first);
return;
}
auto pair = single_requests.emplace (channel_a->get_endpoint (), single_request_data{ std::make_shared<nano::telemetry_impl> (network, alarm, worker), std::chrono::steady_clock::now () });
auto & single_request_data_it = pair.first;
update_cleanup_data (single_request_data_it->first, single_request_data_it->second, pair.second);
single_request_data_it->second.impl->get_metrics_async ({ channel_a }, [callback_a, channel_a](telemetry_data_responses const & telemetry_data_responses_a) {
// There should only be 1 response, so if this hasn't been received then conclude it is an error.
auto const error = !telemetry_data_responses_a.all_received;
if (!error)
if (it != recent_or_initial_request_telemetry_data.cend () && it->undergoing_request)
{
debug_assert (telemetry_data_responses_a.telemetry_datas.size () == 1);
auto it = telemetry_data_responses_a.telemetry_datas.begin ();
callback_a ({ it->second, it->first, error });
// A request is currently undergoing, add the callback
debug_assert (callbacks.count (it->endpoint) > 0);
callbacks[it->endpoint].push_back (callback_a);
}
else
{
callback_a ({ nano::telemetry_data{}, channel_a->get_endpoint (), error });
if (it == recent_or_initial_request_telemetry_data.cend ())
{
recent_or_initial_request_telemetry_data.emplace (channel_a->get_endpoint (), nano::telemetry_data (), std::chrono::steady_clock::now (), true);
it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ());
}
else
{
recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) {
telemetry_info_a.undergoing_request = true;
telemetry_info_a.last_request = std::chrono::steady_clock::now ();
});
}
callbacks[it->endpoint].push_back (callback_a);
fire_request_message (channel_a);
}
});
}
}
else
{
@ -269,252 +224,143 @@ nano::telemetry_data_response nano::telemetry::get_metrics_single_peer (std::sha
return promise.get_future ().get ();
}
size_t nano::telemetry::telemetry_data_size ()
void nano::telemetry::fire_request_message (std::shared_ptr<nano::transport::channel> const & channel)
{
nano::lock_guard<std::mutex> guard (mutex);
auto total = std::accumulate (single_requests.begin (), single_requests.end (), static_cast<size_t> (0), [](size_t total, auto & single_request) {
return total += single_request.second.impl->telemetry_data_size ();
});
// Fire off a telemetry request to all passed in channels
debug_assert (channel->get_network_version () >= network_params.protocol.telemetry_protocol_version_min);
if (batch_request)
uint64_t round_l;
{
total += batch_request->telemetry_data_size ();
auto it = recent_or_initial_request_telemetry_data.find (channel->get_endpoint ());
recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) {
++telemetry_info_a.round;
});
round_l = it->round;
}
return total;
std::weak_ptr<nano::telemetry> this_w (shared_from_this ());
nano::telemetry_req message;
// clang-format off
channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) {
if (auto this_l = this_w.lock ())
{
if (ec)
{
// Error sending the telemetry_req message
nano::lock_guard<std::mutex> guard (this_l->mutex);
this_l->channel_processed (endpoint, true);
}
}
},
nano::buffer_drop_policy::no_socket_drop);
// clang-format on
// If no response is seen after a certain period of time remove it
alarm.add (std::chrono::steady_clock::now () + response_time_cutoff, [round_l, this_w, endpoint = channel->get_endpoint ()]() {
if (auto this_l = this_w.lock ())
{
nano::lock_guard<std::mutex> guard (this_l->mutex);
auto it = this_l->recent_or_initial_request_telemetry_data.find (endpoint);
if (it != this_l->recent_or_initial_request_telemetry_data.cend () && it->undergoing_request && round_l == it->round)
{
this_l->channel_processed (endpoint, true);
}
}
});
}
size_t nano::telemetry::finished_single_requests_size ()
void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool error_a)
{
nano::lock_guard<std::mutex> guard (mutex);
return finished_single_requests.size ();
if (recent_or_initial_request_telemetry_data.count (endpoint_a) > 0)
{
if (error_a)
{
recent_or_initial_request_telemetry_data.erase (endpoint_a);
}
flush_callbacks_async (endpoint_a, error_a);
}
}
nano::telemetry_impl::telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) :
alarm_cutoff (is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3),
network (network_a),
alarm (alarm_a),
worker (worker_a)
{
}
void nano::telemetry_impl::flush_callbacks_async ()
void nano::telemetry::flush_callbacks_async (nano::endpoint const & endpoint_a, bool error_a)
{
// Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise)
worker.push_task ([this_w = std::weak_ptr<nano::telemetry_impl> (shared_from_this ())]() {
worker.push_task ([endpoint_a, error_a, this_w = std::weak_ptr<nano::telemetry> (shared_from_this ())]() {
if (auto this_l = this_w.lock ())
{
nano::unique_lock<std::mutex> lk (this_l->mutex);
// Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added,
// so check again and invoke those too
this_l->invoking = true;
while (!this_l->callbacks.empty ())
while (!this_l->callbacks[endpoint_a].empty ())
{
lk.unlock ();
this_l->invoke_callbacks ();
this_l->invoke_callbacks (endpoint_a, error_a);
lk.lock ();
}
this_l->invoking = false;
}
});
}
void nano::telemetry_impl::get_metrics_async (std::deque<std::shared_ptr<nano::transport::channel>> const & channels_a, std::function<void(telemetry_data_responses const &)> const & callback_a)
void nano::telemetry::invoke_callbacks (nano::endpoint const & endpoint_a, bool error_a)
{
std::vector<std::function<void(telemetry_data_response const &)>> callbacks_l;
telemetry_data_response response_data{ nano::telemetry_data (), endpoint_a, error_a };
{
nano::unique_lock<std::mutex> lk (mutex);
callbacks.push_back (callback_a);
if (callbacks.size () > 1 || invoking)
{
// This means we already have at least one pending result already, so it will handle calls this callback when it completes
return;
}
// Check if we can just return cached results
if (channels_a.empty () || std::chrono::steady_clock::now () <= (last_time + cache_cutoff))
{
flush_callbacks_async ();
return;
}
failed.clear ();
debug_assert (required_responses.empty ());
std::transform (channels_a.begin (), channels_a.end (), std::inserter (required_responses, required_responses.end ()), [](auto const & channel) {
return channel->get_endpoint ();
});
}
fire_request_messages (channels_a);
}
void nano::telemetry_impl::add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a)
{
nano::unique_lock<std::mutex> lk (mutex);
if (required_responses.find (endpoint_a) == required_responses.cend ())
{
// Not requesting telemetry data from this channel so ignore it
return;
}
if (!is_empty_a)
{
current_telemetry_data_responses[endpoint_a] = { telemetry_data_a, std::chrono::steady_clock::now () };
}
channel_processed (lk, endpoint_a);
}
void nano::telemetry_impl::invoke_callbacks ()
{
decltype (callbacks) callbacks_l;
bool all_received;
std::unordered_map<nano::endpoint, nano::telemetry_data> cached_responses_l;
{
// Copy callbacks so that they can be called outside of holding the lock
// Copy data so that it can be used outside of holding the lock
nano::lock_guard<std::mutex> guard (mutex);
callbacks_l = callbacks;
cached_responses_l.reserve (cached_telemetry_data.size ());
std::transform (cached_telemetry_data.begin (), cached_telemetry_data.end (), std::inserter (cached_responses_l, cached_responses_l.end ()), [](auto const & endpoint_telemetry_data) {
return std::pair<const nano::endpoint, nano::telemetry_data>{ endpoint_telemetry_data.first, endpoint_telemetry_data.second.data };
});
current_telemetry_data_responses.clear ();
callbacks.clear ();
all_received = failed.empty ();
callbacks_l = callbacks[endpoint_a];
auto it = recent_or_initial_request_telemetry_data.find (endpoint_a);
if (it != recent_or_initial_request_telemetry_data.end ())
{
response_data.telemetry_data = it->data;
}
callbacks.erase (endpoint_a);
}
if (pre_callback_callback)
{
pre_callback_callback (cached_responses_l, mutex);
}
// Need to account for nodes which disable telemetry data in responses
bool all_received_l = !cached_responses_l.empty () && all_received;
for (auto & callback : callbacks_l)
{
callback ({ cached_responses_l, all_received_l });
callback (response_data);
}
}
void nano::telemetry_impl::channel_processed (nano::unique_lock<std::mutex> & lk_a, nano::endpoint const & endpoint_a)
{
debug_assert (lk_a.owns_lock ());
auto num_removed = required_responses.erase (endpoint_a);
if (num_removed > 0 && required_responses.empty ())
{
debug_assert (lk_a.owns_lock ());
cached_telemetry_data = current_telemetry_data_responses;
last_time = std::chrono::steady_clock::now ();
flush_callbacks_async ();
}
}
void nano::telemetry_impl::fire_request_messages (std::deque<std::shared_ptr<nano::transport::channel>> const & channels)
{
uint64_t round_l;
{
nano::lock_guard<std::mutex> guard (mutex);
++round;
round_l = round;
}
// Fire off a telemetry request to all passed in channels
nano::telemetry_req message;
for (auto & channel : channels)
{
debug_assert (channel->get_network_version () >= network_params.protocol.telemetry_protocol_version_min);
std::weak_ptr<nano::telemetry_impl> this_w (shared_from_this ());
// clang-format off
channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) {
if (auto this_l = this_w.lock ())
{
if (ec)
{
// Error sending the telemetry_req message
nano::unique_lock<std::mutex> lk (this_l->mutex);
this_l->failed.push_back (endpoint);
this_l->channel_processed (lk, endpoint);
}
}
},
nano::buffer_drop_policy::no_socket_drop);
// clang-format on
// If no response is seen after a certain period of time, remove it from the list of expected responses. However, only if it is part of the same round.
alarm.add (std::chrono::steady_clock::now () + alarm_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() {
if (auto this_l = this_w.lock ())
{
nano::unique_lock<std::mutex> lk (this_l->mutex);
if (this_l->round == round_l && this_l->required_responses.find (endpoint) != this_l->required_responses.cend ())
{
this_l->failed.push_back (endpoint);
this_l->channel_processed (lk, endpoint);
}
}
});
}
}
size_t nano::telemetry_impl::telemetry_data_size ()
size_t nano::telemetry::telemetry_data_size ()
{
nano::lock_guard<std::mutex> guard (mutex);
return current_telemetry_data_responses.size ();
return recent_or_initial_request_telemetry_data.size ();
}
bool nano::telemetry_data_time_pair::operator== (telemetry_data_time_pair const & telemetry_data_time_pair_a) const
nano::telemetry_info::telemetry_info (nano::endpoint const & endpoint_a, nano::telemetry_data const & data_a, std::chrono::steady_clock::time_point last_request_a, bool undergoing_request_a) :
endpoint (endpoint_a),
data (data_a),
last_request (last_request_a),
undergoing_request (undergoing_request_a)
{
return data == telemetry_data_time_pair_a.data && last_updated == telemetry_data_time_pair_a.last_updated;
}
bool nano::telemetry_data_time_pair::operator!= (telemetry_data_time_pair const & telemetry_data_time_pair_a) const
bool nano::telemetry_info::awaiting_first_response () const
{
return !(*this == telemetry_data_time_pair_a);
return data == nano::telemetry_data ();
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (telemetry & telemetry, const std::string & name)
{
size_t single_requests_count;
auto composite = std::make_unique<container_info_composite> (name);
size_t callbacks_count;
{
nano::lock_guard<std::mutex> guard (telemetry.mutex);
single_requests_count = telemetry.single_requests.size ();
std::unordered_map<nano::endpoint, std::vector<std::function<void(telemetry_data_response const &)>>> callbacks;
callbacks_count = std::accumulate (callbacks.begin (), callbacks.end (), static_cast<size_t> (0), [](auto total, auto const & callback_a) {
return total += callback_a.second.size ();
});
}
auto composite = std::make_unique<container_info_composite> (name);
if (telemetry.batch_request)
{
composite->add_component (collect_container_info (*telemetry.batch_request, "batch_request"));
}
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "single_requests", single_requests_count, sizeof (decltype (telemetry.single_requests)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "finished_single_requests", telemetry.finished_single_requests_size (), sizeof (decltype (telemetry.finished_single_requests)::value_type) }));
return composite;
}
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "recent_or_initial_request_telemetry_data", telemetry.telemetry_data_size (), sizeof (decltype (telemetry.recent_or_initial_request_telemetry_data)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "callbacks", callbacks_count, sizeof (decltype (telemetry.callbacks)::value_type::second_type) }));
std::unique_ptr<nano::container_info_component> nano::collect_container_info (telemetry_impl & telemetry_impl, const std::string & name)
{
size_t callback_count;
size_t all_telemetry_data_count;
size_t cached_telemetry_data_count;
size_t required_responses_count;
{
nano::lock_guard<std::mutex> guard (telemetry_impl.mutex);
callback_count = telemetry_impl.callbacks.size ();
all_telemetry_data_count = telemetry_impl.current_telemetry_data_responses.size ();
cached_telemetry_data_count = telemetry_impl.cached_telemetry_data.size ();
required_responses_count = telemetry_impl.required_responses.size ();
}
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "callbacks", callback_count, sizeof (decltype (telemetry_impl.callbacks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "current_telemetry_data_responses", all_telemetry_data_count, sizeof (decltype (telemetry_impl.current_telemetry_data_responses)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "cached_telemetry_data", cached_telemetry_data_count, sizeof (decltype (telemetry_impl.cached_telemetry_data)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "required_responses", required_responses_count, sizeof (decltype (telemetry_impl.required_responses)::value_type) }));
return composite;
}
nano::telemetry_data nano::consolidate_telemetry_data (std::vector<nano::telemetry_data> const & telemetry_datas)
{
std::vector<nano::telemetry_data_time_pair> telemetry_data_time_pairs;
telemetry_data_time_pairs.reserve (telemetry_datas.size ());
if (telemetry_datas.empty ())
{
return {};

View file

@ -3,30 +3,26 @@
#include <nano/node/common.hpp>
#include <nano/secure/common.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index_container.hpp>
#include <functional>
#include <memory>
#include <unordered_set>
namespace mi = boost::multi_index;
namespace nano
{
class network;
class alarm;
class worker;
class telemetry;
namespace transport
{
class channel;
}
class telemetry_data_time_pair
{
public:
nano::telemetry_data data;
std::chrono::steady_clock::time_point last_updated;
bool operator== (telemetry_data_time_pair const &) const;
bool operator!= (telemetry_data_time_pair const &) const;
};
/*
* Holds a response from a telemetry request
*/
@ -38,106 +34,48 @@ public:
bool error{ true };
};
/*
* Holds many responses from telemetry requests
*/
class telemetry_data_responses
class telemetry_info final
{
public:
std::unordered_map<nano::endpoint, telemetry_data> telemetry_datas;
bool all_received{ false };
};
telemetry_info () = default;
telemetry_info (nano::endpoint const & endpoint, nano::telemetry_data const & data, std::chrono::steady_clock::time_point last_request, bool undergoing_request);
bool awaiting_first_response () const;
/*
* This class requests node telemetry metrics and invokes any callbacks
* which have been aggregated. Further calls to get_metrics_async may return cached telemetry metrics
* if they are within cache_cutoff time from the latest request.
*/
class telemetry_impl : public std::enable_shared_from_this<telemetry_impl>
{
public:
telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a);
private:
// Class only available to the telemetry class
void get_metrics_async (std::deque<std::shared_ptr<nano::transport::channel>> const & channels_a, std::function<void(telemetry_data_responses const &)> const & callback_a);
void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a);
size_t telemetry_data_size ();
nano::network_params network_params;
// Anything older than this requires requesting metrics from other nodes.
std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) };
std::chrono::seconds const alarm_cutoff;
// All data in this chunk is protected by this mutex
std::mutex mutex;
std::vector<std::function<void(telemetry_data_responses const &)>> callbacks;
std::chrono::steady_clock::time_point last_time = std::chrono::steady_clock::now () - cache_cutoff;
/* The responses received during this request round */
std::unordered_map<nano::endpoint, telemetry_data_time_pair> current_telemetry_data_responses;
/* The metrics for the last request round */
std::unordered_map<nano::endpoint, telemetry_data_time_pair> cached_telemetry_data;
std::unordered_set<nano::endpoint> required_responses;
nano::endpoint endpoint;
nano::telemetry_data data;
std::chrono::steady_clock::time_point last_request;
bool undergoing_request{ false };
uint64_t round{ 0 };
/* Currently executing callbacks */
bool invoking{ false };
std::vector<nano::endpoint> failed;
nano::network & network;
nano::alarm & alarm;
nano::worker & worker;
std::function<void(std::unordered_map<nano::endpoint, telemetry_data> & data_a, std::mutex &)> pre_callback_callback;
void invoke_callbacks ();
void channel_processed (nano::unique_lock<std::mutex> & lk_a, nano::endpoint const & endpoint_a);
void flush_callbacks_async ();
void fire_request_messages (std::deque<std::shared_ptr<nano::transport::channel>> const & channels);
friend std::unique_ptr<container_info_component> collect_container_info (telemetry_impl &, const std::string &);
friend nano::telemetry;
friend class node_telemetry_single_request_Test;
friend class node_telemetry_basic_Test;
friend class node_telemetry_ongoing_requests_Test;
};
std::unique_ptr<nano::container_info_component> collect_container_info (telemetry_impl & telemetry_impl, const std::string & name);
/*
* This class has 2 main operations:
* Request metrics from specific single peers (single_requests)
* - If this peer is in the batched request, it will use the value from that, otherwise send a telemetry_req message (non-droppable)
* Request metrics from all peers (batched_request)
* - This is polled every minute.
* - If a single request is currently underway, do not request because other peers will just reject if within a hotzone time
* - This will be proactively added when callbacks are called inside pre_callback_callback
* This class requests node telemetry metrics from peers and invokes any callbacks which have been aggregated.
* All calls to get_metrics return cached data, it does not do any requests, these are periodically done in
* ongoing_req_all_peers. This can be disabled with the disable_ongoing_telemetry_requests node flag.
* Calls to get_metrics_single_peer_async will wait until a response is made if it is not within the cache
* cut off.
*/
class telemetry
class telemetry : public std::enable_shared_from_this<telemetry>
{
public:
telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, bool disable_ongoing_requests_a);
telemetry (nano::network &, nano::alarm &, nano::worker &, bool);
void start ();
void stop ();
/*
* Add telemetry metrics received from this endpoint.
* Should this be unsolicited, it will not be added.
* Some peers may have disabled responding with telemetry data, need to account for this
* Set the telemetry data associated with this peer
*/
void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a);
void set (nano::telemetry_data const &, nano::endpoint const &, bool);
/*
* Collects metrics from all known peers and invokes the callback when complete.
* This returns what ever is in the cache
*/
void get_metrics_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a);
/*
* A blocking version of get_metrics_peers_async ().
*/
telemetry_data_responses get_metrics_peers ();
std::unordered_map<nano::endpoint, nano::telemetry_data> get_metrics ();
/*
* This makes a telemetry request to the specific channel
*/
void get_metrics_single_peer_async (std::shared_ptr<nano::transport::channel> const &, std::function<void(telemetry_data_response const &)> const & callback_a);
void get_metrics_single_peer_async (std::shared_ptr<nano::transport::channel> const &, std::function<void(telemetry_data_response const &)> const &);
/*
* A blocking version of get_metrics_single_peer_async
@ -149,51 +87,52 @@ public:
*/
size_t telemetry_data_size ();
/*
* Return the number of finished_single_requests elements
*/
size_t finished_single_requests_size ();
/*
* Stop the telemetry processor
*/
void stop ();
private:
class tag_endpoint
{
};
class tag_last_updated
{
};
nano::network & network;
nano::alarm & alarm;
nano::worker & worker;
std::atomic<bool> stopped{ false };
nano::network_params network_params;
class single_request_data
{
public:
std::shared_ptr<telemetry_impl> impl;
std::chrono::steady_clock::time_point last_updated{ std::chrono::steady_clock::now () };
};
bool disable_ongoing_requests;
std::mutex mutex;
/* Requests telemetry data from a random selection of peers */
std::shared_ptr<telemetry_impl> batch_request;
/* Any requests to specific individual peers is maintained here */
std::unordered_map<nano::endpoint, single_request_data> single_requests;
/* This holds data from single_requests after the cache is removed */
std::unordered_map<nano::endpoint, telemetry_data_time_pair> finished_single_requests;
bool stopped{ false };
// clang-format off
// This holds the last telemetry data received from peers or can be a placeholder awaiting the first response (check with awaiting_first_response ())
boost::multi_index_container<nano::telemetry_info,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_endpoint>,
mi::member<nano::telemetry_info, nano::endpoint, &nano::telemetry_info::endpoint>>,
mi::ordered_non_unique<mi::tag<tag_last_updated>,
mi::member<nano::telemetry_info, std::chrono::steady_clock::time_point, &nano::telemetry_info::last_request>>>> recent_or_initial_request_telemetry_data;
// clang-format on
void update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a);
void ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a);
void ongoing_req_all_peers ();
// Anything older than this requires requesting metrics from other nodes.
std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) };
std::chrono::seconds const response_time_cutoff{ is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3 };
friend class node_telemetry_multiple_single_request_clearing_Test;
friend class node_telemetry_ongoing_requests_Test;
friend std::unique_ptr<container_info_component> collect_container_info (telemetry &, const std::string &);
std::unordered_map<nano::endpoint, std::vector<std::function<void(telemetry_data_response const &)>>> callbacks;
void ongoing_req_all_peers (std::chrono::milliseconds);
void fire_request_message (std::shared_ptr<nano::transport::channel> const & channel);
void channel_processed (nano::endpoint const &, bool);
void flush_callbacks_async (nano::endpoint const &, bool);
void invoke_callbacks (nano::endpoint const &, bool);
bool within_cache_cutoff (nano::telemetry_info const &) const;
friend std::unique_ptr<nano::container_info_component> collect_container_info (telemetry & telemetry, const std::string & name);
};
std::unique_ptr<nano::container_info_component> collect_container_info (telemetry & telemetry, const std::string & name);
nano::telemetry_data consolidate_telemetry_data (std::vector<telemetry_data> const & telemetry_data);
nano::telemetry_data_time_pair consolidate_telemetry_data_time_pairs (std::vector<telemetry_data_time_pair> const & telemetry_data_time_pairs);
nano::telemetry_data local_telemetry_data (nano::ledger_cache const &, nano::network &, uint64_t, nano::network_params const &, std::chrono::steady_clock::time_point);
}

View file

@ -78,7 +78,7 @@ namespace transport
class tcp_channels final
{
friend class nano::transport::channel_tcp;
friend class node_telemetry_simultaneous_single_and_all_requests_Test;
friend class node_telemetry_simultaneous_requests_Test;
public:
tcp_channels (nano::node &);

View file

@ -7990,12 +7990,25 @@ TEST (rpc, node_telemetry_all)
{
ASSERT_NO_ERROR (system.poll ());
auto transaction = system.nodes.back ()->store.tx_begin_read ();
peers_stored = system.nodes.back ()->store.peer_count (transaction) != 0;
auto transaction = node1.store.tx_begin_read ();
peers_stored = node1.store.peer_count (transaction) != 0;
}
// First need to set up the cached data
std::atomic<bool> done{ false };
auto node = system.nodes.front ();
node1.telemetry->get_metrics_single_peer_async (node1.network.find_channel (node->network.endpoint ()), [&done](nano::telemetry_data_response const & telemetry_data_response_a) {
ASSERT_FALSE (telemetry_data_response_a.error);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
boost::property_tree::ptree request;
auto node = system.nodes.front ();
request.put ("action", "node_telemetry");
{
test_response response (request, rpc.config.port, system.io_ctx);

View file

@ -857,10 +857,8 @@ public:
class shared_data
{
public:
nano::util::counted_completion write_completion{ 0 };
std::atomic<bool> done{ false };
std::atomic<uint64_t> count{ 0 };
std::promise<void> promise;
std::shared_future<void> shared_future{ promise.get_future () };
};
template <typename T>
@ -880,16 +878,10 @@ void callback_process (shared_data & shared_data_a, data & data, T & all_node_da
data.awaiting_cache = true;
data.orig_time = last_updated;
}
if (--shared_data_a.count == 0 && std::all_of (all_node_data_a.begin (), all_node_data_a.end (), [](auto const & data) { return !data.keep_requesting_metrics; }))
{
shared_data_a.done = true;
shared_data_a.promise.set_value ();
}
shared_data_a.write_completion.increment ();
};
}
namespace nano
{
TEST (node_telemetry, ongoing_requests)
{
nano::system system (2);
@ -899,20 +891,20 @@ TEST (node_telemetry, ongoing_requests)
wait_peer_connections (system);
ASSERT_EQ (0, node_client->telemetry.telemetry_data_size ());
ASSERT_EQ (0, node_server->telemetry.telemetry_data_size ());
ASSERT_EQ (0, node_client->telemetry->telemetry_data_size ());
ASSERT_EQ (0, node_server->telemetry->telemetry_data_size ());
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::out));
system.deadline_set (20s);
while (node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in) != 1 || node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out) != 1)
while (node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in) != 1 || node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in) != 1)
{
ASSERT_NO_ERROR (system.poll ());
}
// Wait till the next ongoing will be called, and add a 1s buffer for the actual processing
auto time = std::chrono::steady_clock::now ();
while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + node_client->telemetry.batch_request->alarm_cutoff + 1s))
while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + 1s))
{
ASSERT_NO_ERROR (system.poll ());
}
@ -924,70 +916,12 @@ TEST (node_telemetry, ongoing_requests)
ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
}
}
TEST (node_telemetry, simultaneous_all_requests)
{
const auto num_nodes = 4;
nano::system system (num_nodes);
// Wait until peers are stored as they are done in the background
wait_peer_connections (system);
std::vector<std::thread> threads;
const auto num_threads = 4;
std::array<data, num_nodes> all_data{};
for (auto i = 0; i < num_nodes; ++i)
{
all_data[i].node = system.nodes[i];
}
shared_data shared_data;
// Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired.
// The test waits until all telemetry_ack messages have been received.
for (int i = 0; i < num_threads; ++i)
{
threads.emplace_back ([&all_data, &shared_data]() {
while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
{
for (auto & data : all_data)
{
// Keep calling requesting telemetry metrics until the cache has been saved and then become outdated (after a certain period of time) for each node
if (data.keep_requesting_metrics)
{
++shared_data.count;
data.node->telemetry.get_metrics_peers_async ([&shared_data, &data, &all_data](nano::telemetry_data_responses const & responses_a) {
callback_process (shared_data, data, all_data, *responses_a.telemetry_datas.begin ()->second.timestamp);
});
}
std::this_thread::sleep_for (1ms);
}
}
shared_data.shared_future.wait ();
ASSERT_EQ (shared_data.count, 0);
});
}
system.deadline_set (20s);
while (!shared_data.done)
{
ASSERT_NO_ERROR (system.poll ());
}
for (auto & thread : threads)
{
thread.join ();
}
}
namespace nano
{
namespace transport
{
TEST (node_telemetry, simultaneous_single_and_all_requests)
TEST (node_telemetry, simultaneous_requests)
{
const auto num_nodes = 4;
nano::system system (num_nodes);
@ -997,66 +931,52 @@ namespace transport
std::vector<std::thread> threads;
const auto num_threads = 4;
std::array<data, num_nodes> node_data_single{};
std::array<data, num_nodes> node_data_all{};
std::array<data, num_nodes> node_data{};
for (auto i = 0; i < num_nodes; ++i)
{
node_data_single[i].node = system.nodes[i];
node_data_all[i].node = system.nodes[i];
node_data[i].node = system.nodes[i];
}
shared_data shared_data_single;
shared_data shared_data_all;
shared_data shared_data;
// Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired.
// The test waits until all telemetry_ack messages have been received.
for (int i = 0; i < num_threads; ++i)
{
threads.emplace_back ([&node_data_single, &node_data_all, &shared_data_single, &shared_data_all]() {
auto func = [](auto & all_node_data_a, shared_data & shared_data_a, bool single_a) {
while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
threads.emplace_back ([&node_data, &shared_data]() {
while (std::any_of (node_data.cbegin (), node_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
{
for (auto & data : node_data)
{
for (auto & data : all_node_data_a)
// Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node
if (data.keep_requesting_metrics)
{
// Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node
if (data.keep_requesting_metrics)
{
++shared_data_a.count;
shared_data.write_completion.increment_required_count ();
if (single_a)
{
// Pick first peer to be consistent
auto peer = data.node->network.tcp_channels.channels[0].channel;
data.node->telemetry.get_metrics_single_peer_async (peer, [&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_response const & telemetry_data_response_a) {
callback_process (shared_data_a, data, all_node_data_a, *telemetry_data_response_a.telemetry_data.timestamp);
});
}
else
{
data.node->telemetry.get_metrics_peers_async ([&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_responses const & telemetry_data_responses_a) {
callback_process (shared_data_a, data, all_node_data_a, *telemetry_data_responses_a.telemetry_datas.begin ()->second.timestamp);
});
}
}
std::this_thread::sleep_for (1ms);
// Pick first peer to be consistent
auto peer = data.node->network.tcp_channels.channels[0].channel;
data.node->telemetry->get_metrics_single_peer_async (peer, [&shared_data, &data, &node_data](nano::telemetry_data_response const & telemetry_data_response_a) {
ASSERT_FALSE (telemetry_data_response_a.error);
callback_process (shared_data, data, node_data, *telemetry_data_response_a.telemetry_data.timestamp);
});
}
std::this_thread::sleep_for (1ms);
}
}
shared_data_a.shared_future.wait ();
ASSERT_EQ (shared_data_a.count, 0);
};
func (node_data_single, shared_data_single, true);
func (node_data_all, shared_data_all, false);
shared_data.write_completion.await_count_for (20s);
shared_data.done = true;
});
}
system.deadline_set (30s);
while (!shared_data_all.done || !shared_data_single.done)
while (!shared_data.done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (std::all_of (node_data.begin (), node_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; }));
for (auto & thread : threads)
{
thread.join ();