Add timestamp to telemetry responses (#2573)

This commit is contained in:
Wesley Shillingford 2020-02-24 16:56:20 +00:00 committed by GitHub
commit 3c8ae09394
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 177 additions and 143 deletions

View file

@ -16,6 +16,8 @@ void compare_default_test_result_data (nano::telemetry_data const & telemetry_da
TEST (node_telemetry, consolidate_data)
{
auto time = 1582117035109;
// Pick specific values so that we can check both mode and average are working correctly
nano::telemetry_data data;
data.account_count = 2;
@ -31,6 +33,7 @@ TEST (node_telemetry, consolidate_data)
data.minor_version = 1;
data.patch_version = 4;
data.pre_release_version = 6;
data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time));
nano::telemetry_data data1;
data1.account_count = 5;
@ -47,6 +50,7 @@ TEST (node_telemetry, consolidate_data)
data1.patch_version = 3;
data1.pre_release_version = 6;
data1.maker = 2;
data1.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 1));
nano::telemetry_data data2;
data2.account_count = 3;
@ -62,6 +66,7 @@ TEST (node_telemetry, consolidate_data)
data2.minor_version = 1;
data2.patch_version = 4;
data2.pre_release_version = 6;
data2.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time));
std::vector<nano::telemetry_data> all_data{ data, data1, data2 };
@ -80,6 +85,7 @@ TEST (node_telemetry, consolidate_data)
ASSERT_FALSE (consolidated_telemetry_data.patch_version.is_initialized ());
ASSERT_FALSE (consolidated_telemetry_data.pre_release_version.is_initialized ());
ASSERT_FALSE (consolidated_telemetry_data.maker.is_initialized ());
ASSERT_EQ (*consolidated_telemetry_data.timestamp, std::chrono::system_clock::time_point (std::chrono::milliseconds (time)));
// Modify the metrics which may be either the mode or averages to ensure all are tested.
all_data[2].bandwidth_cap = 53;
@ -90,6 +96,7 @@ TEST (node_telemetry, consolidate_data)
all_data[2].patch_version = 3;
all_data[2].pre_release_version = 6;
all_data[2].maker = 2;
all_data[2].timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 2));
auto consolidated_telemetry_data1 = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (consolidated_telemetry_data1.major_version, 10);
@ -100,6 +107,7 @@ TEST (node_telemetry, consolidate_data)
ASSERT_TRUE (consolidated_telemetry_data1.protocol_version == 11 || consolidated_telemetry_data1.protocol_version == 12 || consolidated_telemetry_data1.protocol_version == 13);
ASSERT_EQ (consolidated_telemetry_data1.bandwidth_cap, 51);
ASSERT_EQ (consolidated_telemetry_data1.genesis_block, nano::block_hash (3));
ASSERT_EQ (*consolidated_telemetry_data1.timestamp, std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 1)));
// Test equality operator
ASSERT_FALSE (consolidated_telemetry_data == consolidated_telemetry_data1);
@ -108,16 +116,19 @@ TEST (node_telemetry, consolidate_data)
TEST (node_telemetry, consolidate_data_optional_data)
{
auto time = 1582117035109;
nano::telemetry_data data;
data.major_version = 20;
data.minor_version = 1;
data.patch_version = 4;
data.pre_release_version = 6;
data.maker = 2;
data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time));
nano::telemetry_data missing_minor;
missing_minor.major_version = 20;
missing_minor.patch_version = 4;
missing_minor.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 3));
nano::telemetry_data missing_all_optional;
@ -128,6 +139,7 @@ TEST (node_telemetry, consolidate_data_optional_data)
ASSERT_EQ (*consolidated_telemetry_data.patch_version, 4);
ASSERT_EQ (*consolidated_telemetry_data.pre_release_version, 6);
ASSERT_EQ (*consolidated_telemetry_data.maker, 2);
ASSERT_EQ (*consolidated_telemetry_data.timestamp, std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 1)));
}
TEST (node_telemetry, serialize_deserialize_json_optional)
@ -137,6 +149,7 @@ TEST (node_telemetry, serialize_deserialize_json_optional)
data.patch_version = 4;
data.pre_release_version = 6;
data.maker = 2;
data.timestamp = std::chrono::system_clock::time_point (100ms);
nano::jsonconfig config;
data.serialize_json (config);
@ -150,6 +163,9 @@ TEST (node_telemetry, serialize_deserialize_json_optional)
ASSERT_EQ (val, 6);
ASSERT_FALSE (config.get ("maker", val).get_error ());
ASSERT_EQ (val, 2);
uint64_t timestamp;
ASSERT_FALSE (config.get ("timestamp", timestamp).get_error ());
ASSERT_EQ (timestamp, 100);
nano::telemetry_data data1;
data1.deserialize_json (config);
@ -157,6 +173,7 @@ TEST (node_telemetry, serialize_deserialize_json_optional)
ASSERT_EQ (*data1.patch_version, 4);
ASSERT_EQ (*data1.pre_release_version, 6);
ASSERT_EQ (*data1.maker, 2);
ASSERT_EQ (*data1.timestamp, std::chrono::system_clock::time_point (100ms));
nano::telemetry_data no_optional_data;
nano::jsonconfig config1;
@ -165,6 +182,7 @@ TEST (node_telemetry, serialize_deserialize_json_optional)
ASSERT_FALSE (config1.get_optional<uint8_t> ("patch_version").is_initialized ());
ASSERT_FALSE (config1.get_optional<uint8_t> ("pre_release_version").is_initialized ());
ASSERT_FALSE (config1.get_optional<uint8_t> ("maker").is_initialized ());
ASSERT_FALSE (config1.get_optional<uint64_t> ("timestamp").is_initialized ());
nano::telemetry_data no_optional_data1;
no_optional_data1.deserialize_json (config1);
@ -172,6 +190,7 @@ TEST (node_telemetry, serialize_deserialize_json_optional)
ASSERT_FALSE (no_optional_data1.patch_version.is_initialized ());
ASSERT_FALSE (no_optional_data1.pre_release_version.is_initialized ());
ASSERT_FALSE (no_optional_data1.maker.is_initialized ());
ASSERT_FALSE (no_optional_data1.timestamp.is_initialized ());
}
TEST (node_telemetry, consolidate_data_remove_outliers)
@ -191,6 +210,7 @@ TEST (node_telemetry, consolidate_data_remove_outliers)
data.patch_version = 5;
data.pre_release_version = 2;
data.maker = 1;
data.timestamp = std::chrono::system_clock::time_point (100ms);
// Insert 20 of these, and 2 outliers at the lower and upper bounds which should get removed
std::vector<nano::telemetry_data> all_data (20, data);
@ -211,6 +231,7 @@ TEST (node_telemetry, consolidate_data_remove_outliers)
outlier_data.patch_version = 1;
outlier_data.pre_release_version = 1;
outlier_data.maker = 1;
outlier_data.timestamp = std::chrono::system_clock::time_point (1ms);
all_data.push_back (outlier_data);
all_data.push_back (outlier_data);
@ -229,6 +250,7 @@ TEST (node_telemetry, consolidate_data_remove_outliers)
outlier_data1.patch_version = 9;
outlier_data1.pre_release_version = 9;
outlier_data1.maker = 9;
outlier_data1.timestamp = std::chrono::system_clock::time_point (999ms);
all_data.push_back (outlier_data1);
all_data.push_back (outlier_data1);
@ -242,7 +264,7 @@ TEST (node_telemetry, no_peers)
std::atomic<bool> done{ false };
system.nodes[0]->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.telemetry_data_time_pairs.empty ());
ASSERT_TRUE (responses_a.telemetry_datas.empty ());
ASSERT_FALSE (responses_a.all_received);
done = true;
});
@ -267,12 +289,12 @@ TEST (node_telemetry, basic)
wait_peer_connections (system);
// Request telemetry metrics
std::unordered_map<nano::endpoint, nano::telemetry_data_time_pair> all_telemetry_data_time_pairs;
std::unordered_map<nano::endpoint, nano::telemetry_data> all_telemetry_datas;
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs;
all_telemetry_datas = responses_a.telemetry_datas;
done = true;
});
@ -284,14 +306,14 @@ TEST (node_telemetry, basic)
}
// Check the metrics are correct
ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1);
compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server);
ASSERT_EQ (all_telemetry_datas.size (), 1);
compare_default_test_result_data (all_telemetry_datas.begin ()->second, *node_server);
// Call again straight away. It should use the cache
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
ASSERT_EQ (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs);
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_EQ (all_telemetry_datas, responses_a.telemetry_datas);
ASSERT_TRUE (responses_a.all_received);
done = true;
});
@ -307,8 +329,8 @@ TEST (node_telemetry, basic)
std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
ASSERT_NE (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs);
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_NE (all_telemetry_datas, responses_a.telemetry_datas);
ASSERT_TRUE (responses_a.all_received);
done = true;
});
@ -350,10 +372,10 @@ TEST (node_telemetry, many_nodes)
auto node_client = system.nodes.front ();
std::atomic<bool> done{ false };
std::unordered_map<nano::endpoint, nano::telemetry_data_time_pair> all_telemetry_data_time_pairs;
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
std::unordered_map<nano::endpoint, nano::telemetry_data> all_telemetry_datas;
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs;
all_telemetry_datas = responses_a.telemetry_datas;
done = true;
});
@ -365,9 +387,9 @@ TEST (node_telemetry, many_nodes)
// Check the metrics
nano::network_params params;
for (auto & telemetry_data_time_pair : all_telemetry_data_time_pairs)
for (auto & telemetry_data : all_telemetry_datas)
{
auto & data = telemetry_data_time_pair.second.data;
auto & data = telemetry_data.second;
ASSERT_EQ (data.unchecked_count, 0);
ASSERT_EQ (data.cemented_count, 1);
ASSERT_LE (data.peer_count, 9);
@ -386,10 +408,10 @@ TEST (node_telemetry, many_nodes)
}
// We gave some nodes different bandwidth caps, confirm they are not all the same
auto bandwidth_cap = all_telemetry_data_time_pairs.begin ()->second.data.bandwidth_cap;
all_telemetry_data_time_pairs.erase (all_telemetry_data_time_pairs.begin ());
auto all_bandwidth_limits_same = std::all_of (all_telemetry_data_time_pairs.begin (), all_telemetry_data_time_pairs.end (), [bandwidth_cap](auto & telemetry_data_time_pair) {
return telemetry_data_time_pair.second.data.bandwidth_cap == bandwidth_cap;
auto bandwidth_cap = all_telemetry_datas.begin ()->second.bandwidth_cap;
all_telemetry_datas.erase (all_telemetry_datas.begin ());
auto all_bandwidth_limits_same = std::all_of (all_telemetry_datas.begin (), all_telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) {
return telemetry_data.second.bandwidth_cap == bandwidth_cap;
});
ASSERT_FALSE (all_bandwidth_limits_same);
}
@ -416,10 +438,10 @@ TEST (node_telemetry, over_udp)
wait_peer_connections (system);
std::atomic<bool> done{ false };
std::unordered_map<nano::endpoint, nano::telemetry_data_time_pair> all_telemetry_data_time_pairs;
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
std::unordered_map<nano::endpoint, nano::telemetry_data> all_telemetry_datas;
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs;
all_telemetry_datas = responses_a.telemetry_datas;
done = true;
});
@ -429,8 +451,8 @@ TEST (node_telemetry, over_udp)
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1);
compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server);
ASSERT_EQ (all_telemetry_datas.size (), 1);
compare_default_test_result_data (all_telemetry_datas.begin ()->second, *node_server);
// Check channels are indeed udp
ASSERT_EQ (1, node_client->network.size ());
@ -458,14 +480,14 @@ TEST (node_telemetry, single_request)
// Request telemetry metrics
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
nano::telemetry_data_time_pair telemetry_data_time_pair;
nano::telemetry_data telemetry_data;
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair, &channel](nano::telemetry_data_response const & response_a) {
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data, &channel](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_EQ (channel->get_endpoint (), response_a.endpoint);
telemetry_data_time_pair = response_a.telemetry_data_time_pair;
telemetry_data = response_a.telemetry_data;
done = true;
});
@ -477,13 +499,13 @@ TEST (node_telemetry, single_request)
}
// Check the metrics are correct
compare_default_test_result_data (telemetry_data_time_pair.data, *node_server);
compare_default_test_result_data (telemetry_data, *node_server);
// Call again straight away. It should use the cache
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (telemetry_data_time_pair, response_a.telemetry_data_time_pair);
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (telemetry_data, response_a.telemetry_data);
ASSERT_FALSE (response_a.error);
done = true;
});
@ -499,8 +521,8 @@ TEST (node_telemetry, single_request)
std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) {
ASSERT_NE (telemetry_data_time_pair, response_a.telemetry_data_time_pair);
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_NE (telemetry_data, response_a.telemetry_data);
ASSERT_FALSE (response_a.error);
done = true;
});
@ -565,13 +587,13 @@ TEST (node_telemetry, blocking_single_and_random)
// Blocking version of get_random_metrics_async
auto telemetry_data_responses = node_client->telemetry.get_metrics_peers ();
ASSERT_TRUE (telemetry_data_responses.all_received);
compare_default_test_result_data (telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.data, *node_server);
compare_default_test_result_data (telemetry_data_responses.telemetry_datas.begin ()->second, *node_server);
// Now try single request metric
auto telemetry_data_response = node_client->telemetry.get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ()));
ASSERT_FALSE (telemetry_data_response.error);
compare_default_test_result_data (telemetry_data_response.telemetry_data_time_pair.data, *node_server);
ASSERT_EQ (telemetry_data_response.telemetry_data_time_pair.last_updated, telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.last_updated);
compare_default_test_result_data (telemetry_data_response.telemetry_data, *node_server);
ASSERT_EQ (*telemetry_data_response.telemetry_data.timestamp, *telemetry_data_responses.telemetry_datas.begin ()->second.timestamp);
done = true;
promise.get_future ().wait ();
@ -596,10 +618,10 @@ TEST (node_telemetry, multiple_single_request_clearing)
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
std::atomic<bool> done{ false };
std::chrono::steady_clock::time_point last_updated;
std::chrono::system_clock::time_point last_updated;
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
last_updated = response_a.telemetry_data_time_pair.last_updated;
last_updated = *response_a.telemetry_data.timestamp;
done = true;
});
@ -615,12 +637,10 @@ TEST (node_telemetry, multiple_single_request_clearing)
system.deadline_set (10s);
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated);
ASSERT_EQ (last_updated, *response_a.telemetry_data.timestamp);
done = true;
});
ASSERT_LT (last_updated, node_client->telemetry.single_requests.begin ()->second.last_updated);
system.deadline_set (10s);
while (!done)
{
@ -631,8 +651,8 @@ TEST (node_telemetry, multiple_single_request_clearing)
auto channel1 = node_client->network.find_channel (node_server1->network.endpoint ());
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, &last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_NE (last_updated, response_a.telemetry_data_time_pair.last_updated);
last_updated = response_a.telemetry_data_time_pair.last_updated;
ASSERT_NE (last_updated, *response_a.telemetry_data.timestamp);
last_updated = *response_a.telemetry_data.timestamp;
done = true;
});
@ -646,7 +666,7 @@ TEST (node_telemetry, multiple_single_request_clearing)
done = false;
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated);
ASSERT_EQ (last_updated, *response_a.telemetry_data.timestamp);
done = true;
});
@ -712,12 +732,12 @@ TEST (node_telemetry, batch_use_single_request_cache)
wait_peer_connections (system);
// Request telemetry metrics
nano::telemetry_data_time_pair telemetry_data_time_pair;
nano::telemetry_data telemetry_data;
{
std::atomic<bool> done{ false };
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) {
telemetry_data_time_pair = response_a.telemetry_data_time_pair;
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
telemetry_data = response_a.telemetry_data;
done = true;
});
@ -730,9 +750,9 @@ TEST (node_telemetry, batch_use_single_request_cache)
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data_time_pair](nano::telemetry_data_responses const & responses_a) {
node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
ASSERT_EQ (telemetry_data_time_pair, responses_a.telemetry_data_time_pairs.begin ()->second);
ASSERT_EQ (telemetry_data, responses_a.telemetry_datas.begin ()->second);
done = true;
});
@ -763,7 +783,7 @@ TEST (node_telemetry, batch_use_single_request_cache)
system.deadline_set (10s);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ());
ASSERT_EQ (1, responses_a.telemetry_datas.size ());
done = true;
});
@ -792,13 +812,13 @@ TEST (node_telemetry, single_request_use_batch_cache)
wait_peer_connections (system);
// Request batched metric first
std::unordered_map<nano::endpoint, nano::telemetry_data_time_pair> all_telemetry_data_time_pairs;
std::unordered_map<nano::endpoint, nano::telemetry_data> all_telemetry_datas;
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ());
all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs;
ASSERT_EQ (1, responses_a.telemetry_datas.size ());
all_telemetry_datas = responses_a.telemetry_datas;
done = true;
});
@ -811,8 +831,8 @@ TEST (node_telemetry, single_request_use_batch_cache)
std::atomic<bool> done{ false };
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &all_telemetry_data_time_pairs](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (all_telemetry_data_time_pairs.begin ()->second, response_a.telemetry_data_time_pair);
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &all_telemetry_datas](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (all_telemetry_datas.begin ()->second, response_a.telemetry_data);
ASSERT_FALSE (response_a.error);
done = true;
});
@ -958,7 +978,7 @@ TEST (node_telemetry, disable_metrics_single)
auto channel1 = node_server->network.find_channel (node_client->network.endpoint ());
node_server->telemetry.get_metrics_single_peer_async (channel1, [&done, node_server](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
compare_default_test_result_data (response_a.telemetry_data_time_pair.data, *node_server);
compare_default_test_result_data (response_a.telemetry_data, *node_server);
done = true;
});
@ -999,7 +1019,7 @@ TEST (node_telemetry, disable_metrics_batch)
done = false;
node_server->telemetry.get_metrics_peers_async ([&done, node_server](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
compare_default_test_result_data (responses_a.telemetry_data_time_pairs.begin ()->second.data, *node_server);
compare_default_test_result_data (responses_a.telemetry_datas.begin ()->second, *node_server);
done = true;
});
@ -1036,12 +1056,13 @@ void compare_default_test_result_data (nano::telemetry_data const & telemetry_da
ASSERT_EQ (telemetry_data_a.protocol_version, node_server_a.network_params.protocol.telemetry_protocol_version_min);
ASSERT_EQ (telemetry_data_a.unchecked_count, 0);
ASSERT_EQ (telemetry_data_a.account_count, 1);
ASSERT_LT (telemetry_data_a.uptime, 100);
ASSERT_EQ (telemetry_data_a.genesis_block, node_server_a.network_params.ledger.genesis_hash);
ASSERT_EQ (telemetry_data_a.major_version, nano::get_major_node_version ());
ASSERT_EQ (*telemetry_data_a.minor_version, nano::get_minor_node_version ());
ASSERT_EQ (*telemetry_data_a.patch_version, nano::get_patch_node_version ());
ASSERT_EQ (*telemetry_data_a.pre_release_version, nano::get_pre_release_node_version ());
ASSERT_EQ (*telemetry_data_a.maker, 0);
ASSERT_LT (telemetry_data_a.uptime, 100);
ASSERT_EQ (telemetry_data_a.genesis_block, node_server_a.network_params.ledger.genesis_hash);
ASSERT_GT (*telemetry_data_a.timestamp, std::chrono::system_clock::now () - 100s);
}
}

View file

@ -1117,6 +1117,7 @@ void nano::telemetry_ack::serialize (nano::stream & stream_a) const
write (stream_a, *data.patch_version);
write (stream_a, *data.pre_release_version);
write (stream_a, *data.maker);
write (stream_a, std::chrono::duration_cast<std::chrono::milliseconds> (data.timestamp->time_since_epoch ()).count ());
}
}
@ -1151,6 +1152,13 @@ bool nano::telemetry_ack::deserialize (nano::stream & stream_a)
read (stream_a, out);
data.maker = out;
}
if (header.extensions.to_ulong () > telemetry_data::size_v1)
{
uint64_t timestamp;
read (stream_a, timestamp);
data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (timestamp));
}
}
}
catch (std::runtime_error const &)
@ -1209,6 +1217,10 @@ nano::error nano::telemetry_data::serialize_json (nano::jsonconfig & json) const
{
json.put ("maker", *maker);
}
if (timestamp.is_initialized ())
{
json.put ("timestamp", std::chrono::duration_cast<std::chrono::milliseconds> (timestamp->time_since_epoch ()).count ());
}
return json.get_error ();
}
@ -1236,13 +1248,18 @@ nano::error nano::telemetry_data::deserialize_json (nano::jsonconfig & json)
patch_version = json.get_optional<uint8_t> ("patch_version");
pre_release_version = json.get_optional<uint8_t> ("pre_release_version");
maker = json.get_optional<uint8_t> ("maker");
auto timestamp_l = json.get_optional<uint64_t> ("timestamp");
if (timestamp_l.is_initialized ())
{
timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (*timestamp_l));
}
return json.get_error ();
}
bool nano::telemetry_data::operator== (nano::telemetry_data const & data_a) const
{
return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version == data_a.protocol_version && genesis_block == data_a.genesis_block && major_version == data_a.major_version && minor_version == data_a.minor_version && patch_version == data_a.patch_version && pre_release_version == data_a.pre_release_version && maker == data_a.maker);
return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version == data_a.protocol_version && genesis_block == data_a.genesis_block && major_version == data_a.major_version && minor_version == data_a.minor_version && patch_version == data_a.patch_version && pre_release_version == data_a.pre_release_version && maker == data_a.maker && timestamp == data_a.timestamp);
}
bool nano::telemetry_data::operator!= (nano::telemetry_data const & data_a) const

View file

@ -349,6 +349,7 @@ public:
boost::optional<uint8_t> patch_version;
boost::optional<uint8_t> pre_release_version;
boost::optional<uint8_t> maker; // 0 for NF node
boost::optional<std::chrono::system_clock::time_point> timestamp;
nano::error serialize_json (nano::jsonconfig & json) const;
nano::error deserialize_json (nano::jsonconfig & json);
@ -356,7 +357,8 @@ public:
bool operator!= (nano::telemetry_data const &) const;
static auto constexpr size_v0 = sizeof (block_count) + sizeof (cemented_count) + sizeof (unchecked_count) + sizeof (account_count) + sizeof (bandwidth_cap) + sizeof (peer_count) + sizeof (protocol_version) + sizeof (uptime) + sizeof (genesis_block) + sizeof (major_version);
static auto constexpr size = size_v0 + sizeof (decltype (minor_version)::value_type) + sizeof (decltype (patch_version)::value_type) + sizeof (decltype (pre_release_version)::value_type) + sizeof (decltype (maker)::value_type);
static auto constexpr size_v1 = size_v0 + sizeof (decltype (minor_version)::value_type) + sizeof (decltype (patch_version)::value_type) + sizeof (decltype (pre_release_version)::value_type) + sizeof (decltype (maker)::value_type);
static auto constexpr size = size_v1 + sizeof (uint64_t);
};
class telemetry_req final : public message
{

View file

@ -3941,7 +3941,6 @@ void nano::json_handler::telemetry ()
nano::jsonconfig config_l;
auto err = telemetry_data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (std::chrono::system_clock::now ().time_since_epoch ()).count ());
auto const & ptree = config_l.get_tree ();
if (!err)
@ -3979,12 +3978,11 @@ void nano::json_handler::telemetry ()
if (!ec)
{
debug_assert (channel);
node.telemetry.get_metrics_single_peer_async (channel, [rpc_l](auto const & single_telemetry_metric_a) {
if (!single_telemetry_metric_a.error)
node.telemetry.get_metrics_single_peer_async (channel, [rpc_l](auto const & telemetry_response_a) {
if (!telemetry_response_a.error)
{
nano::jsonconfig config_l;
auto err = single_telemetry_metric_a.telemetry_data_time_pair.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (single_telemetry_metric_a.telemetry_data_time_pair.system_last_updated.time_since_epoch ()).count ());
auto err = telemetry_response_a.telemetry_data.serialize_json (config_l);
auto const & ptree = config_l.get_tree ();
if (!err)
@ -4015,15 +4013,14 @@ void nano::json_handler::telemetry ()
// setting "raw" to true returns metrics from all nodes requested.
auto raw = request.get_optional<bool> ("raw");
auto output_raw = raw.value_or (false);
node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) {
node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](telemetry_data_responses const & telemetry_responses_a) {
if (output_raw)
{
boost::property_tree::ptree metrics;
for (auto & telemetry_metrics : batched_telemetry_metrics_a.telemetry_data_time_pairs)
for (auto & telemetry_metrics : telemetry_responses_a.telemetry_datas)
{
nano::jsonconfig config_l;
auto err = telemetry_metrics.second.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (telemetry_metrics.second.system_last_updated.time_since_epoch ()).count ());
auto err = telemetry_metrics.second.serialize_json (config_l);
config_l.put ("address", telemetry_metrics.first.address ());
config_l.put ("port", telemetry_metrics.first.port ());
if (!err)
@ -4041,15 +4038,14 @@ void nano::json_handler::telemetry ()
else
{
nano::jsonconfig config_l;
std::vector<nano::telemetry_data_time_pair> telemetry_data_time_pairs;
telemetry_data_time_pairs.reserve (batched_telemetry_metrics_a.telemetry_data_time_pairs.size ());
std::transform (batched_telemetry_metrics_a.telemetry_data_time_pairs.begin (), batched_telemetry_metrics_a.telemetry_data_time_pairs.end (), std::back_inserter (telemetry_data_time_pairs), [](auto const & telemetry_data_time_pair_a) {
return telemetry_data_time_pair_a.second;
std::vector<nano::telemetry_data> telemetry_datas;
telemetry_datas.reserve (telemetry_responses_a.telemetry_datas.size ());
std::transform (telemetry_responses_a.telemetry_datas.begin (), telemetry_responses_a.telemetry_datas.end (), std::back_inserter (telemetry_datas), [](auto const & endpoint_telemetry_data) {
return endpoint_telemetry_data.second;
});
auto average_telemetry_metrics = nano::consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs);
auto err = average_telemetry_metrics.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (average_telemetry_metrics.system_last_updated.time_since_epoch ()).count ());
auto average_telemetry_metrics = nano::consolidate_telemetry_data (telemetry_datas);
auto err = average_telemetry_metrics.serialize_json (config_l);
auto const & ptree = config_l.get_tree ();
if (!err)

View file

@ -20,7 +20,7 @@ worker (worker_a),
batch_request (std::make_shared<nano::telemetry_impl> (network, alarm, worker))
{
// Before callbacks are called with the batch request, check if any of the single request data can be appended to give
batch_request->pre_callback_callback = [this](std::unordered_map<nano::endpoint, telemetry_data_time_pair> & data_a, std::mutex & mutex_a) {
batch_request->pre_callback_callback = [this](std::unordered_map<nano::endpoint, telemetry_data> & datas_a, std::mutex & mutex_a) {
nano::lock_guard<std::mutex> guard (this->mutex);
for (auto & single_request : single_requests)
{
@ -35,7 +35,7 @@ batch_request (std::make_shared<nano::telemetry_impl> (network, alarm, worker))
}
else
{
data_a.emplace (single_request.first, single_request.second.impl->cached_telemetry_data.begin ()->second);
datas_a.emplace (single_request.first, single_request.second.impl->cached_telemetry_data.begin ()->second.data);
}
}
}
@ -50,7 +50,7 @@ batch_request (std::make_shared<nano::telemetry_impl> (network, alarm, worker))
}
else
{
data_a.emplace (pending.first, pending.second);
datas_a.emplace (pending.first, pending.second.data);
}
}
finished_single_requests.clear ();
@ -195,7 +195,7 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
}
worker.push_task ([callback_a, endpoint]() {
auto const error = true;
callback_a ({ nano::telemetry_data_time_pair{}, endpoint, error });
callback_a ({ nano::telemetry_data{}, endpoint, error });
});
};
@ -204,8 +204,8 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
{
if (channel_a && (channel_a->get_network_version () >= network_params.protocol.telemetry_protocol_version_min))
{
auto add_callback_async = [& worker = this->worker, &callback_a](telemetry_data_time_pair const & telemetry_data_time_pair_a, nano::endpoint const & endpoint_a) {
telemetry_data_response telemetry_data_response_l{ telemetry_data_time_pair_a, endpoint_a, false };
auto add_callback_async = [& worker = this->worker, &callback_a](telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a) {
telemetry_data_response telemetry_data_response_l{ telemetry_data_a, endpoint_a, false };
worker.push_task ([telemetry_data_response_l, callback_a]() {
callback_a (telemetry_data_response_l);
});
@ -217,7 +217,7 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
auto it = batch_request->cached_telemetry_data.find (channel_a->get_endpoint ());
if (it != batch_request->cached_telemetry_data.cend ())
{
add_callback_async (it->second, it->first);
add_callback_async (it->second.data, it->first);
return;
}
}
@ -225,7 +225,7 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
auto it = finished_single_requests.find (channel_a->get_endpoint ());
if (it != finished_single_requests.cend ())
{
add_callback_async (it->second, it->first);
add_callback_async (it->second.data, it->first);
return;
}
@ -238,13 +238,13 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
auto const error = !telemetry_data_responses_a.all_received;
if (!error)
{
debug_assert (telemetry_data_responses_a.telemetry_data_time_pairs.size () == 1);
auto it = telemetry_data_responses_a.telemetry_data_time_pairs.begin ();
debug_assert (telemetry_data_responses_a.telemetry_datas.size () == 1);
auto it = telemetry_data_responses_a.telemetry_datas.begin ();
callback_a ({ it->second, it->first, error });
}
else
{
callback_a ({ nano::telemetry_data_time_pair{}, channel_a->get_endpoint (), error });
callback_a ({ nano::telemetry_data{}, channel_a->get_endpoint (), error });
}
});
}
@ -357,7 +357,7 @@ void nano::telemetry_impl::add (nano::telemetry_data const & telemetry_data_a, n
if (!is_empty_a)
{
current_telemetry_data_responses[endpoint_a] = { telemetry_data_a, std::chrono::steady_clock::now (), std::chrono::system_clock::now () };
current_telemetry_data_responses[endpoint_a] = { telemetry_data_a, std::chrono::steady_clock::now () };
}
channel_processed (lk, endpoint_a);
}
@ -366,12 +366,16 @@ void nano::telemetry_impl::invoke_callbacks ()
{
decltype (callbacks) callbacks_l;
bool all_received;
decltype (cached_telemetry_data) cached_telemetry_data_l;
std::unordered_map<nano::endpoint, nano::telemetry_data> cached_responses_l;
{
// Copy callbacks so that they can be called outside of holding the lock
nano::lock_guard<std::mutex> guard (mutex);
callbacks_l = callbacks;
cached_telemetry_data_l = cached_telemetry_data;
cached_responses_l.reserve (cached_telemetry_data.size ());
std::transform (cached_telemetry_data.begin (), cached_telemetry_data.end (), std::inserter (cached_responses_l, cached_responses_l.end ()), [](auto const & endpoint_telemetry_data) {
return std::pair<const nano::endpoint, nano::telemetry_data>{ endpoint_telemetry_data.first, endpoint_telemetry_data.second.data };
});
current_telemetry_data_responses.clear ();
callbacks.clear ();
all_received = failed.empty ();
@ -379,13 +383,13 @@ void nano::telemetry_impl::invoke_callbacks ()
if (pre_callback_callback)
{
pre_callback_callback (cached_telemetry_data_l, mutex);
pre_callback_callback (cached_responses_l, mutex);
}
// Need to account for nodes which disable telemetry data in responses
bool all_received_l = !cached_telemetry_data_l.empty () && all_received;
bool all_received_l = !cached_responses_l.empty () && all_received;
for (auto & callback : callbacks_l)
{
callback ({ cached_telemetry_data_l, all_received_l });
callback ({ cached_responses_l, all_received_l });
}
}
@ -511,24 +515,14 @@ nano::telemetry_data nano::consolidate_telemetry_data (std::vector<nano::telemet
std::vector<nano::telemetry_data_time_pair> telemetry_data_time_pairs;
telemetry_data_time_pairs.reserve (telemetry_datas.size ());
std::transform (telemetry_datas.begin (), telemetry_datas.end (), std::back_inserter (telemetry_data_time_pairs), [](nano::telemetry_data const & telemetry_data_a) {
// Don't care about the timestamps here
return nano::telemetry_data_time_pair{ telemetry_data_a, {}, {} };
});
return consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs).data;
}
nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std::vector<nano::telemetry_data_time_pair> const & telemetry_data_time_pairs_a)
{
if (telemetry_data_time_pairs_a.empty ())
if (telemetry_datas.empty ())
{
return {};
}
else if (telemetry_data_time_pairs_a.size () == 1)
else if (telemetry_datas.size () == 1)
{
// Only 1 element in the collection, so just return it.
return telemetry_data_time_pairs_a.front ();
return telemetry_datas.front ();
}
std::unordered_map<uint8_t, int> protocol_versions;
@ -542,13 +536,12 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std:
std::multiset<uint64_t> cemented_counts;
std::multiset<uint32_t> peer_counts;
std::multiset<uint64_t> unchecked_counts;
std::multiset<uint64_t> uptime_counts;
std::multiset<uint64_t> bandwidth_counts;
std::multiset<long long> timestamp_counts;
std::multiset<uint64_t> uptimes;
std::multiset<uint64_t> bandwidths;
std::multiset<uint64_t> timestamps;
for (auto const & telemetry_data_time_pair : telemetry_data_time_pairs_a)
for (auto const & telemetry_data : telemetry_datas)
{
auto & telemetry_data = telemetry_data_time_pair.data;
account_counts.insert (telemetry_data.account_count);
block_counts.insert (telemetry_data.block_count);
cemented_counts.insert (telemetry_data.cemented_count);
@ -572,25 +565,28 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std:
}
}
if (telemetry_data.timestamp.is_initialized ())
{
timestamps.insert (std::chrono::duration_cast<std::chrono::milliseconds> (telemetry_data.timestamp->time_since_epoch ()).count ());
}
++vendor_versions[ss.str ()];
++protocol_versions[telemetry_data.protocol_version];
peer_counts.insert (telemetry_data.peer_count);
unchecked_counts.insert (telemetry_data.unchecked_count);
uptime_counts.insert (telemetry_data.uptime);
uptimes.insert (telemetry_data.uptime);
// 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed
if (telemetry_data.bandwidth_cap != 0)
{
bandwidth_counts.insert (telemetry_data.bandwidth_cap);
bandwidths.insert (telemetry_data.bandwidth_cap);
}
++bandwidth_caps[telemetry_data.bandwidth_cap];
++genesis_blocks[telemetry_data.genesis_block];
timestamp_counts.insert (std::chrono::time_point_cast<std::chrono::milliseconds> (telemetry_data_time_pair.system_last_updated).time_since_epoch ().count ());
}
// Remove 10% of the results from the lower and upper bounds to catch any outliers. Need at least 10 responses before any are removed.
auto num_either_side_to_remove = telemetry_data_time_pairs_a.size () / 10;
auto num_either_side_to_remove = telemetry_datas.size () / 10;
auto strip_outliers_and_sum = [num_either_side_to_remove](auto & counts) {
counts.erase (counts.begin (), std::next (counts.begin (), num_either_side_to_remove));
@ -605,11 +601,11 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std:
auto cemented_sum = strip_outliers_and_sum (cemented_counts);
auto peer_sum = strip_outliers_and_sum (peer_counts);
auto unchecked_sum = strip_outliers_and_sum (unchecked_counts);
auto uptime_sum = strip_outliers_and_sum (uptime_counts);
auto bandwidth_sum = strip_outliers_and_sum (bandwidth_counts);
auto uptime_sum = strip_outliers_and_sum (uptimes);
auto bandwidth_sum = strip_outliers_and_sum (bandwidths);
nano::telemetry_data consolidated_data;
auto size = telemetry_data_time_pairs_a.size () - num_either_side_to_remove * 2;
auto size = telemetry_datas.size () - num_either_side_to_remove * 2;
consolidated_data.account_count = boost::numeric_cast<decltype (consolidated_data.account_count)> (account_sum / size);
consolidated_data.block_count = boost::numeric_cast<decltype (consolidated_data.block_count)> (block_sum / size);
consolidated_data.cemented_count = boost::numeric_cast<decltype (consolidated_data.cemented_count)> (cemented_sum / size);
@ -617,6 +613,12 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std:
consolidated_data.uptime = boost::numeric_cast<decltype (consolidated_data.uptime)> (uptime_sum / size);
consolidated_data.unchecked_count = boost::numeric_cast<decltype (consolidated_data.unchecked_count)> (unchecked_sum / size);
if (!timestamps.empty ())
{
auto timestamp_sum = strip_outliers_and_sum (timestamps);
consolidated_data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (boost::numeric_cast<uint64_t> (timestamp_sum / timestamps.size ())));
}
auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
@ -668,11 +670,7 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std:
consolidated_data.maker = boost::lexical_cast<uint8_t> (version_fragments[4]);
}
// Consolidate timestamps
auto timestamp_sum = strip_outliers_and_sum (timestamp_counts);
auto consolidated_timestamp = boost::numeric_cast<long long> (timestamp_sum / size);
return telemetry_data_time_pair{ consolidated_data, std::chrono::steady_clock::time_point{}, std::chrono::system_clock::time_point (std::chrono::milliseconds (consolidated_timestamp)) };
return consolidated_data;
}
nano::telemetry_data nano::local_telemetry_data (nano::ledger_cache const & ledger_cache_a, nano::network & network_a, uint64_t bandwidth_limit_a, nano::network_params const & network_params_a, std::chrono::steady_clock::time_point statup_time_a)
@ -692,5 +690,6 @@ nano::telemetry_data nano::local_telemetry_data (nano::ledger_cache const & ledg
telemetry_data.patch_version = nano::get_patch_node_version ();
telemetry_data.pre_release_version = nano::get_pre_release_node_version ();
telemetry_data.maker = 0; // 0 Indicates it originated from the NF
telemetry_data.timestamp = std::chrono::system_clock::now ();
return telemetry_data;
}

View file

@ -23,7 +23,6 @@ class telemetry_data_time_pair
public:
nano::telemetry_data data;
std::chrono::steady_clock::time_point last_updated;
std::chrono::system_clock::time_point system_last_updated;
bool operator== (telemetry_data_time_pair const &) const;
bool operator!= (telemetry_data_time_pair const &) const;
};
@ -34,7 +33,7 @@ public:
class telemetry_data_response
{
public:
nano::telemetry_data_time_pair telemetry_data_time_pair;
nano::telemetry_data telemetry_data;
nano::endpoint endpoint;
bool error{ true };
};
@ -45,7 +44,7 @@ public:
class telemetry_data_responses
{
public:
std::unordered_map<nano::endpoint, telemetry_data_time_pair> telemetry_data_time_pairs;
std::unordered_map<nano::endpoint, telemetry_data> telemetry_datas;
bool all_received{ false };
};
@ -88,7 +87,7 @@ private:
nano::alarm & alarm;
nano::worker & worker;
std::function<void(std::unordered_map<nano::endpoint, telemetry_data_time_pair> & data_a, std::mutex &)> pre_callback_callback;
std::function<void(std::unordered_map<nano::endpoint, telemetry_data> & data_a, std::mutex &)> pre_callback_callback;
void invoke_callbacks ();
void channel_processed (nano::unique_lock<std::mutex> & lk_a, nano::endpoint const & endpoint_a);

View file

@ -78,7 +78,7 @@ namespace transport
class tcp_channels final
{
friend class nano::transport::channel_tcp;
friend class node_telemetry_simultaneous_single_and_random_requests_Test;
friend class node_telemetry_simultaneous_single_and_all_requests_Test;
public:
tcp_channels (nano::node &);

View file

@ -846,7 +846,7 @@ public:
std::atomic<bool> awaiting_cache{ false };
std::atomic<bool> keep_requesting_metrics{ true };
std::shared_ptr<nano::node> node;
std::chrono::steady_clock::time_point orig_time;
std::chrono::system_clock::time_point orig_time;
std::atomic_flag orig_time_set = ATOMIC_FLAG_INIT;
};
class shared_data
@ -859,7 +859,7 @@ public:
};
template <typename T>
void callback_process (shared_data & shared_data_a, data & data, T & all_node_data_a, std::chrono::steady_clock::time_point last_updated)
void callback_process (shared_data & shared_data_a, data & data, T & all_node_data_a, std::chrono::system_clock::time_point last_updated)
{
if (!data.orig_time_set.test_and_set ())
{
@ -921,7 +921,7 @@ TEST (node_telemetry, ongoing_requests)
}
}
TEST (node_telemetry, simultaneous_random_requests)
TEST (node_telemetry, simultaneous_all_requests)
{
const auto num_nodes = 4;
nano::system system (num_nodes);
@ -954,7 +954,7 @@ TEST (node_telemetry, simultaneous_random_requests)
{
++shared_data.count;
data.node->telemetry.get_metrics_peers_async ([&shared_data, &data, &all_data](nano::telemetry_data_responses const & responses_a) {
callback_process (shared_data, data, all_data, responses_a.telemetry_data_time_pairs.begin ()->second.last_updated);
callback_process (shared_data, data, all_data, *responses_a.telemetry_datas.begin ()->second.timestamp);
});
}
std::this_thread::sleep_for (1ms);
@ -982,7 +982,7 @@ namespace nano
{
namespace transport
{
TEST (node_telemetry, simultaneous_single_and_random_requests)
TEST (node_telemetry, simultaneous_single_and_all_requests)
{
const auto num_nodes = 4;
nano::system system (num_nodes);
@ -993,21 +993,21 @@ namespace transport
const auto num_threads = 4;
std::array<data, num_nodes> node_data_single{};
std::array<data, num_nodes> node_data_random{};
std::array<data, num_nodes> node_data_all{};
for (auto i = 0; i < num_nodes; ++i)
{
node_data_single[i].node = system.nodes[i];
node_data_random[i].node = system.nodes[i];
node_data_all[i].node = system.nodes[i];
}
shared_data shared_data_single;
shared_data shared_data_random;
shared_data shared_data_all;
// Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired.
// The test waits until all telemetry_ack messages have been received.
for (int i = 0; i < num_threads; ++i)
{
threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() {
threads.emplace_back ([&node_data_single, &node_data_all, &shared_data_single, &shared_data_all]() {
auto func = [](auto & all_node_data_a, shared_data & shared_data_a, bool single_a) {
while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
{
@ -1023,13 +1023,13 @@ namespace transport
// Pick first peer to be consistent
auto peer = data.node->network.tcp_channels.channels[0].channel;
data.node->telemetry.get_metrics_single_peer_async (peer, [&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_response const & telemetry_data_response_a) {
callback_process (shared_data_a, data, all_node_data_a, telemetry_data_response_a.telemetry_data_time_pair.last_updated);
callback_process (shared_data_a, data, all_node_data_a, *telemetry_data_response_a.telemetry_data.timestamp);
});
}
else
{
data.node->telemetry.get_metrics_peers_async ([&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_responses const & telemetry_data_responses_a) {
callback_process (shared_data_a, data, all_node_data_a, telemetry_data_responses_a.telemetry_data_time_pairs.begin ()->second.last_updated);
callback_process (shared_data_a, data, all_node_data_a, *telemetry_data_responses_a.telemetry_datas.begin ()->second.timestamp);
});
}
}
@ -1042,12 +1042,12 @@ namespace transport
};
func (node_data_single, shared_data_single, true);
func (node_data_random, shared_data_random, false);
func (node_data_all, shared_data_all, false);
});
}
system.deadline_set (30s);
while (!shared_data_random.done || !shared_data_single.done)
while (!shared_data_all.done || !shared_data_single.done)
{
ASSERT_NO_ERROR (system.poll ());
}