diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp index 6c528d31..c42b396c 100644 --- a/nano/core_test/node_telemetry.cpp +++ b/nano/core_test/node_telemetry.cpp @@ -16,6 +16,8 @@ void compare_default_test_result_data (nano::telemetry_data const & telemetry_da TEST (node_telemetry, consolidate_data) { + auto time = 1582117035109; + // Pick specific values so that we can check both mode and average are working correctly nano::telemetry_data data; data.account_count = 2; @@ -31,6 +33,7 @@ TEST (node_telemetry, consolidate_data) data.minor_version = 1; data.patch_version = 4; data.pre_release_version = 6; + data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time)); nano::telemetry_data data1; data1.account_count = 5; @@ -47,6 +50,7 @@ TEST (node_telemetry, consolidate_data) data1.patch_version = 3; data1.pre_release_version = 6; data1.maker = 2; + data1.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 1)); nano::telemetry_data data2; data2.account_count = 3; @@ -62,6 +66,7 @@ TEST (node_telemetry, consolidate_data) data2.minor_version = 1; data2.patch_version = 4; data2.pre_release_version = 6; + data2.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time)); std::vector all_data{ data, data1, data2 }; @@ -80,6 +85,7 @@ TEST (node_telemetry, consolidate_data) ASSERT_FALSE (consolidated_telemetry_data.patch_version.is_initialized ()); ASSERT_FALSE (consolidated_telemetry_data.pre_release_version.is_initialized ()); ASSERT_FALSE (consolidated_telemetry_data.maker.is_initialized ()); + ASSERT_EQ (*consolidated_telemetry_data.timestamp, std::chrono::system_clock::time_point (std::chrono::milliseconds (time))); // Modify the metrics which may be either the mode or averages to ensure all are tested. all_data[2].bandwidth_cap = 53; @@ -90,6 +96,7 @@ TEST (node_telemetry, consolidate_data) all_data[2].patch_version = 3; all_data[2].pre_release_version = 6; all_data[2].maker = 2; + all_data[2].timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 2)); auto consolidated_telemetry_data1 = nano::consolidate_telemetry_data (all_data); ASSERT_EQ (consolidated_telemetry_data1.major_version, 10); @@ -100,6 +107,7 @@ TEST (node_telemetry, consolidate_data) ASSERT_TRUE (consolidated_telemetry_data1.protocol_version == 11 || consolidated_telemetry_data1.protocol_version == 12 || consolidated_telemetry_data1.protocol_version == 13); ASSERT_EQ (consolidated_telemetry_data1.bandwidth_cap, 51); ASSERT_EQ (consolidated_telemetry_data1.genesis_block, nano::block_hash (3)); + ASSERT_EQ (*consolidated_telemetry_data1.timestamp, std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 1))); // Test equality operator ASSERT_FALSE (consolidated_telemetry_data == consolidated_telemetry_data1); @@ -108,16 +116,19 @@ TEST (node_telemetry, consolidate_data) TEST (node_telemetry, consolidate_data_optional_data) { + auto time = 1582117035109; nano::telemetry_data data; data.major_version = 20; data.minor_version = 1; data.patch_version = 4; data.pre_release_version = 6; data.maker = 2; + data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time)); nano::telemetry_data missing_minor; missing_minor.major_version = 20; missing_minor.patch_version = 4; + missing_minor.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 3)); nano::telemetry_data missing_all_optional; @@ -128,6 +139,7 @@ TEST (node_telemetry, consolidate_data_optional_data) ASSERT_EQ (*consolidated_telemetry_data.patch_version, 4); ASSERT_EQ (*consolidated_telemetry_data.pre_release_version, 6); ASSERT_EQ (*consolidated_telemetry_data.maker, 2); + ASSERT_EQ (*consolidated_telemetry_data.timestamp, std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 1))); } TEST (node_telemetry, serialize_deserialize_json_optional) @@ -137,6 +149,7 @@ TEST (node_telemetry, serialize_deserialize_json_optional) data.patch_version = 4; data.pre_release_version = 6; data.maker = 2; + data.timestamp = std::chrono::system_clock::time_point (100ms); nano::jsonconfig config; data.serialize_json (config); @@ -150,6 +163,9 @@ TEST (node_telemetry, serialize_deserialize_json_optional) ASSERT_EQ (val, 6); ASSERT_FALSE (config.get ("maker", val).get_error ()); ASSERT_EQ (val, 2); + uint64_t timestamp; + ASSERT_FALSE (config.get ("timestamp", timestamp).get_error ()); + ASSERT_EQ (timestamp, 100); nano::telemetry_data data1; data1.deserialize_json (config); @@ -157,6 +173,7 @@ TEST (node_telemetry, serialize_deserialize_json_optional) ASSERT_EQ (*data1.patch_version, 4); ASSERT_EQ (*data1.pre_release_version, 6); ASSERT_EQ (*data1.maker, 2); + ASSERT_EQ (*data1.timestamp, std::chrono::system_clock::time_point (100ms)); nano::telemetry_data no_optional_data; nano::jsonconfig config1; @@ -165,6 +182,7 @@ TEST (node_telemetry, serialize_deserialize_json_optional) ASSERT_FALSE (config1.get_optional ("patch_version").is_initialized ()); ASSERT_FALSE (config1.get_optional ("pre_release_version").is_initialized ()); ASSERT_FALSE (config1.get_optional ("maker").is_initialized ()); + ASSERT_FALSE (config1.get_optional ("timestamp").is_initialized ()); nano::telemetry_data no_optional_data1; no_optional_data1.deserialize_json (config1); @@ -172,6 +190,7 @@ TEST (node_telemetry, serialize_deserialize_json_optional) ASSERT_FALSE (no_optional_data1.patch_version.is_initialized ()); ASSERT_FALSE (no_optional_data1.pre_release_version.is_initialized ()); ASSERT_FALSE (no_optional_data1.maker.is_initialized ()); + ASSERT_FALSE (no_optional_data1.timestamp.is_initialized ()); } TEST (node_telemetry, consolidate_data_remove_outliers) @@ -191,6 +210,7 @@ TEST (node_telemetry, consolidate_data_remove_outliers) data.patch_version = 5; data.pre_release_version = 2; data.maker = 1; + data.timestamp = std::chrono::system_clock::time_point (100ms); // Insert 20 of these, and 2 outliers at the lower and upper bounds which should get removed std::vector all_data (20, data); @@ -211,6 +231,7 @@ TEST (node_telemetry, consolidate_data_remove_outliers) outlier_data.patch_version = 1; outlier_data.pre_release_version = 1; outlier_data.maker = 1; + outlier_data.timestamp = std::chrono::system_clock::time_point (1ms); all_data.push_back (outlier_data); all_data.push_back (outlier_data); @@ -229,6 +250,7 @@ TEST (node_telemetry, consolidate_data_remove_outliers) outlier_data1.patch_version = 9; outlier_data1.pre_release_version = 9; outlier_data1.maker = 9; + outlier_data1.timestamp = std::chrono::system_clock::time_point (999ms); all_data.push_back (outlier_data1); all_data.push_back (outlier_data1); @@ -242,7 +264,7 @@ TEST (node_telemetry, no_peers) std::atomic done{ false }; system.nodes[0]->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.telemetry_data_time_pairs.empty ()); + ASSERT_TRUE (responses_a.telemetry_datas.empty ()); ASSERT_FALSE (responses_a.all_received); done = true; }); @@ -267,12 +289,12 @@ TEST (node_telemetry, basic) wait_peer_connections (system); // Request telemetry metrics - std::unordered_map all_telemetry_data_time_pairs; + std::unordered_map all_telemetry_datas; { std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; + all_telemetry_datas = responses_a.telemetry_datas; done = true; }); @@ -284,14 +306,14 @@ TEST (node_telemetry, basic) } // Check the metrics are correct - ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1); - compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server); + ASSERT_EQ (all_telemetry_datas.size (), 1); + compare_default_test_result_data (all_telemetry_datas.begin ()->second, *node_server); // Call again straight away. It should use the cache { std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { - ASSERT_EQ (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs); + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { + ASSERT_EQ (all_telemetry_datas, responses_a.telemetry_datas); ASSERT_TRUE (responses_a.all_received); done = true; }); @@ -307,8 +329,8 @@ TEST (node_telemetry, basic) std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { - ASSERT_NE (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs); + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { + ASSERT_NE (all_telemetry_datas, responses_a.telemetry_datas); ASSERT_TRUE (responses_a.all_received); done = true; }); @@ -350,10 +372,10 @@ TEST (node_telemetry, many_nodes) auto node_client = system.nodes.front (); std::atomic done{ false }; - std::unordered_map all_telemetry_data_time_pairs; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + std::unordered_map all_telemetry_datas; + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; + all_telemetry_datas = responses_a.telemetry_datas; done = true; }); @@ -365,9 +387,9 @@ TEST (node_telemetry, many_nodes) // Check the metrics nano::network_params params; - for (auto & telemetry_data_time_pair : all_telemetry_data_time_pairs) + for (auto & telemetry_data : all_telemetry_datas) { - auto & data = telemetry_data_time_pair.second.data; + auto & data = telemetry_data.second; ASSERT_EQ (data.unchecked_count, 0); ASSERT_EQ (data.cemented_count, 1); ASSERT_LE (data.peer_count, 9); @@ -386,10 +408,10 @@ TEST (node_telemetry, many_nodes) } // We gave some nodes different bandwidth caps, confirm they are not all the same - auto bandwidth_cap = all_telemetry_data_time_pairs.begin ()->second.data.bandwidth_cap; - all_telemetry_data_time_pairs.erase (all_telemetry_data_time_pairs.begin ()); - auto all_bandwidth_limits_same = std::all_of (all_telemetry_data_time_pairs.begin (), all_telemetry_data_time_pairs.end (), [bandwidth_cap](auto & telemetry_data_time_pair) { - return telemetry_data_time_pair.second.data.bandwidth_cap == bandwidth_cap; + auto bandwidth_cap = all_telemetry_datas.begin ()->second.bandwidth_cap; + all_telemetry_datas.erase (all_telemetry_datas.begin ()); + auto all_bandwidth_limits_same = std::all_of (all_telemetry_datas.begin (), all_telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) { + return telemetry_data.second.bandwidth_cap == bandwidth_cap; }); ASSERT_FALSE (all_bandwidth_limits_same); } @@ -416,10 +438,10 @@ TEST (node_telemetry, over_udp) wait_peer_connections (system); std::atomic done{ false }; - std::unordered_map all_telemetry_data_time_pairs; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + std::unordered_map all_telemetry_datas; + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; + all_telemetry_datas = responses_a.telemetry_datas; done = true; }); @@ -429,8 +451,8 @@ TEST (node_telemetry, over_udp) ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1); - compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server); + ASSERT_EQ (all_telemetry_datas.size (), 1); + compare_default_test_result_data (all_telemetry_datas.begin ()->second, *node_server); // Check channels are indeed udp ASSERT_EQ (1, node_client->network.size ()); @@ -458,14 +480,14 @@ TEST (node_telemetry, single_request) // Request telemetry metrics auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - nano::telemetry_data_time_pair telemetry_data_time_pair; + nano::telemetry_data telemetry_data; { std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair, &channel](nano::telemetry_data_response const & response_a) { + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data, &channel](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); ASSERT_EQ (channel->get_endpoint (), response_a.endpoint); - telemetry_data_time_pair = response_a.telemetry_data_time_pair; + telemetry_data = response_a.telemetry_data; done = true; }); @@ -477,13 +499,13 @@ TEST (node_telemetry, single_request) } // Check the metrics are correct - compare_default_test_result_data (telemetry_data_time_pair.data, *node_server); + compare_default_test_result_data (telemetry_data, *node_server); // Call again straight away. It should use the cache { std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) { - ASSERT_EQ (telemetry_data_time_pair, response_a.telemetry_data_time_pair); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + ASSERT_EQ (telemetry_data, response_a.telemetry_data); ASSERT_FALSE (response_a.error); done = true; }); @@ -499,8 +521,8 @@ TEST (node_telemetry, single_request) std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) { - ASSERT_NE (telemetry_data_time_pair, response_a.telemetry_data_time_pair); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + ASSERT_NE (telemetry_data, response_a.telemetry_data); ASSERT_FALSE (response_a.error); done = true; }); @@ -565,13 +587,13 @@ TEST (node_telemetry, blocking_single_and_random) // Blocking version of get_random_metrics_async auto telemetry_data_responses = node_client->telemetry.get_metrics_peers (); ASSERT_TRUE (telemetry_data_responses.all_received); - compare_default_test_result_data (telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.data, *node_server); + compare_default_test_result_data (telemetry_data_responses.telemetry_datas.begin ()->second, *node_server); // Now try single request metric auto telemetry_data_response = node_client->telemetry.get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ())); ASSERT_FALSE (telemetry_data_response.error); - compare_default_test_result_data (telemetry_data_response.telemetry_data_time_pair.data, *node_server); - ASSERT_EQ (telemetry_data_response.telemetry_data_time_pair.last_updated, telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.last_updated); + compare_default_test_result_data (telemetry_data_response.telemetry_data, *node_server); + ASSERT_EQ (*telemetry_data_response.telemetry_data.timestamp, *telemetry_data_responses.telemetry_datas.begin ()->second.timestamp); done = true; promise.get_future ().wait (); @@ -596,10 +618,10 @@ TEST (node_telemetry, multiple_single_request_clearing) auto channel = node_client->network.find_channel (node_server->network.endpoint ()); std::atomic done{ false }; - std::chrono::steady_clock::time_point last_updated; + std::chrono::system_clock::time_point last_updated; node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - last_updated = response_a.telemetry_data_time_pair.last_updated; + last_updated = *response_a.telemetry_data.timestamp; done = true; }); @@ -615,12 +637,10 @@ TEST (node_telemetry, multiple_single_request_clearing) system.deadline_set (10s); node_client->telemetry.get_metrics_single_peer_async (channel, [&done, last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated); + ASSERT_EQ (last_updated, *response_a.telemetry_data.timestamp); done = true; }); - ASSERT_LT (last_updated, node_client->telemetry.single_requests.begin ()->second.last_updated); - system.deadline_set (10s); while (!done) { @@ -631,8 +651,8 @@ TEST (node_telemetry, multiple_single_request_clearing) auto channel1 = node_client->network.find_channel (node_server1->network.endpoint ()); node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, &last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_NE (last_updated, response_a.telemetry_data_time_pair.last_updated); - last_updated = response_a.telemetry_data_time_pair.last_updated; + ASSERT_NE (last_updated, *response_a.telemetry_data.timestamp); + last_updated = *response_a.telemetry_data.timestamp; done = true; }); @@ -646,7 +666,7 @@ TEST (node_telemetry, multiple_single_request_clearing) done = false; node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, last_updated](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated); + ASSERT_EQ (last_updated, *response_a.telemetry_data.timestamp); done = true; }); @@ -712,12 +732,12 @@ TEST (node_telemetry, batch_use_single_request_cache) wait_peer_connections (system); // Request telemetry metrics - nano::telemetry_data_time_pair telemetry_data_time_pair; + nano::telemetry_data telemetry_data; { std::atomic done{ false }; auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) { - telemetry_data_time_pair = response_a.telemetry_data_time_pair; + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + telemetry_data = response_a.telemetry_data; done = true; }); @@ -730,9 +750,9 @@ TEST (node_telemetry, batch_use_single_request_cache) { std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data_time_pair](nano::telemetry_data_responses const & responses_a) { + node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - ASSERT_EQ (telemetry_data_time_pair, responses_a.telemetry_data_time_pairs.begin ()->second); + ASSERT_EQ (telemetry_data, responses_a.telemetry_datas.begin ()->second); done = true; }); @@ -763,7 +783,7 @@ TEST (node_telemetry, batch_use_single_request_cache) system.deadline_set (10s); std::atomic done{ false }; node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { - ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ()); + ASSERT_EQ (1, responses_a.telemetry_datas.size ()); done = true; }); @@ -792,13 +812,13 @@ TEST (node_telemetry, single_request_use_batch_cache) wait_peer_connections (system); // Request batched metric first - std::unordered_map all_telemetry_data_time_pairs; + std::unordered_map all_telemetry_datas; { std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) { + node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ()); - all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs; + ASSERT_EQ (1, responses_a.telemetry_datas.size ()); + all_telemetry_datas = responses_a.telemetry_datas; done = true; }); @@ -811,8 +831,8 @@ TEST (node_telemetry, single_request_use_batch_cache) std::atomic done{ false }; auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &all_telemetry_data_time_pairs](nano::telemetry_data_response const & response_a) { - ASSERT_EQ (all_telemetry_data_time_pairs.begin ()->second, response_a.telemetry_data_time_pair); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &all_telemetry_datas](nano::telemetry_data_response const & response_a) { + ASSERT_EQ (all_telemetry_datas.begin ()->second, response_a.telemetry_data); ASSERT_FALSE (response_a.error); done = true; }); @@ -958,7 +978,7 @@ TEST (node_telemetry, disable_metrics_single) auto channel1 = node_server->network.find_channel (node_client->network.endpoint ()); node_server->telemetry.get_metrics_single_peer_async (channel1, [&done, node_server](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); - compare_default_test_result_data (response_a.telemetry_data_time_pair.data, *node_server); + compare_default_test_result_data (response_a.telemetry_data, *node_server); done = true; }); @@ -999,7 +1019,7 @@ TEST (node_telemetry, disable_metrics_batch) done = false; node_server->telemetry.get_metrics_peers_async ([&done, node_server](nano::telemetry_data_responses const & responses_a) { ASSERT_TRUE (responses_a.all_received); - compare_default_test_result_data (responses_a.telemetry_data_time_pairs.begin ()->second.data, *node_server); + compare_default_test_result_data (responses_a.telemetry_datas.begin ()->second, *node_server); done = true; }); @@ -1036,12 +1056,13 @@ void compare_default_test_result_data (nano::telemetry_data const & telemetry_da ASSERT_EQ (telemetry_data_a.protocol_version, node_server_a.network_params.protocol.telemetry_protocol_version_min); ASSERT_EQ (telemetry_data_a.unchecked_count, 0); ASSERT_EQ (telemetry_data_a.account_count, 1); + ASSERT_LT (telemetry_data_a.uptime, 100); + ASSERT_EQ (telemetry_data_a.genesis_block, node_server_a.network_params.ledger.genesis_hash); ASSERT_EQ (telemetry_data_a.major_version, nano::get_major_node_version ()); ASSERT_EQ (*telemetry_data_a.minor_version, nano::get_minor_node_version ()); ASSERT_EQ (*telemetry_data_a.patch_version, nano::get_patch_node_version ()); ASSERT_EQ (*telemetry_data_a.pre_release_version, nano::get_pre_release_node_version ()); ASSERT_EQ (*telemetry_data_a.maker, 0); - ASSERT_LT (telemetry_data_a.uptime, 100); - ASSERT_EQ (telemetry_data_a.genesis_block, node_server_a.network_params.ledger.genesis_hash); + ASSERT_GT (*telemetry_data_a.timestamp, std::chrono::system_clock::now () - 100s); } } diff --git a/nano/node/common.cpp b/nano/node/common.cpp index d4bbf64f..1856b8de 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -1117,6 +1117,7 @@ void nano::telemetry_ack::serialize (nano::stream & stream_a) const write (stream_a, *data.patch_version); write (stream_a, *data.pre_release_version); write (stream_a, *data.maker); + write (stream_a, std::chrono::duration_cast (data.timestamp->time_since_epoch ()).count ()); } } @@ -1151,6 +1152,13 @@ bool nano::telemetry_ack::deserialize (nano::stream & stream_a) read (stream_a, out); data.maker = out; } + + if (header.extensions.to_ulong () > telemetry_data::size_v1) + { + uint64_t timestamp; + read (stream_a, timestamp); + data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (timestamp)); + } } } catch (std::runtime_error const &) @@ -1209,6 +1217,10 @@ nano::error nano::telemetry_data::serialize_json (nano::jsonconfig & json) const { json.put ("maker", *maker); } + if (timestamp.is_initialized ()) + { + json.put ("timestamp", std::chrono::duration_cast (timestamp->time_since_epoch ()).count ()); + } return json.get_error (); } @@ -1236,13 +1248,18 @@ nano::error nano::telemetry_data::deserialize_json (nano::jsonconfig & json) patch_version = json.get_optional ("patch_version"); pre_release_version = json.get_optional ("pre_release_version"); maker = json.get_optional ("maker"); + auto timestamp_l = json.get_optional ("timestamp"); + if (timestamp_l.is_initialized ()) + { + timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (*timestamp_l)); + } return json.get_error (); } bool nano::telemetry_data::operator== (nano::telemetry_data const & data_a) const { - return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version == data_a.protocol_version && genesis_block == data_a.genesis_block && major_version == data_a.major_version && minor_version == data_a.minor_version && patch_version == data_a.patch_version && pre_release_version == data_a.pre_release_version && maker == data_a.maker); + return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version == data_a.protocol_version && genesis_block == data_a.genesis_block && major_version == data_a.major_version && minor_version == data_a.minor_version && patch_version == data_a.patch_version && pre_release_version == data_a.pre_release_version && maker == data_a.maker && timestamp == data_a.timestamp); } bool nano::telemetry_data::operator!= (nano::telemetry_data const & data_a) const diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 9464ba80..4cf72732 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -349,6 +349,7 @@ public: boost::optional patch_version; boost::optional pre_release_version; boost::optional maker; // 0 for NF node + boost::optional timestamp; nano::error serialize_json (nano::jsonconfig & json) const; nano::error deserialize_json (nano::jsonconfig & json); @@ -356,7 +357,8 @@ public: bool operator!= (nano::telemetry_data const &) const; static auto constexpr size_v0 = sizeof (block_count) + sizeof (cemented_count) + sizeof (unchecked_count) + sizeof (account_count) + sizeof (bandwidth_cap) + sizeof (peer_count) + sizeof (protocol_version) + sizeof (uptime) + sizeof (genesis_block) + sizeof (major_version); - static auto constexpr size = size_v0 + sizeof (decltype (minor_version)::value_type) + sizeof (decltype (patch_version)::value_type) + sizeof (decltype (pre_release_version)::value_type) + sizeof (decltype (maker)::value_type); + static auto constexpr size_v1 = size_v0 + sizeof (decltype (minor_version)::value_type) + sizeof (decltype (patch_version)::value_type) + sizeof (decltype (pre_release_version)::value_type) + sizeof (decltype (maker)::value_type); + static auto constexpr size = size_v1 + sizeof (uint64_t); }; class telemetry_req final : public message { diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index e96ae817..777ddf90 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3941,7 +3941,6 @@ void nano::json_handler::telemetry () nano::jsonconfig config_l; auto err = telemetry_data.serialize_json (config_l); - config_l.put ("timestamp", std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()).count ()); auto const & ptree = config_l.get_tree (); if (!err) @@ -3979,12 +3978,11 @@ void nano::json_handler::telemetry () if (!ec) { debug_assert (channel); - node.telemetry.get_metrics_single_peer_async (channel, [rpc_l](auto const & single_telemetry_metric_a) { - if (!single_telemetry_metric_a.error) + node.telemetry.get_metrics_single_peer_async (channel, [rpc_l](auto const & telemetry_response_a) { + if (!telemetry_response_a.error) { nano::jsonconfig config_l; - auto err = single_telemetry_metric_a.telemetry_data_time_pair.data.serialize_json (config_l); - config_l.put ("timestamp", std::chrono::duration_cast (single_telemetry_metric_a.telemetry_data_time_pair.system_last_updated.time_since_epoch ()).count ()); + auto err = telemetry_response_a.telemetry_data.serialize_json (config_l); auto const & ptree = config_l.get_tree (); if (!err) @@ -4015,15 +4013,14 @@ void nano::json_handler::telemetry () // setting "raw" to true returns metrics from all nodes requested. auto raw = request.get_optional ("raw"); auto output_raw = raw.value_or (false); - node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) { + node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](telemetry_data_responses const & telemetry_responses_a) { if (output_raw) { boost::property_tree::ptree metrics; - for (auto & telemetry_metrics : batched_telemetry_metrics_a.telemetry_data_time_pairs) + for (auto & telemetry_metrics : telemetry_responses_a.telemetry_datas) { nano::jsonconfig config_l; - auto err = telemetry_metrics.second.data.serialize_json (config_l); - config_l.put ("timestamp", std::chrono::duration_cast (telemetry_metrics.second.system_last_updated.time_since_epoch ()).count ()); + auto err = telemetry_metrics.second.serialize_json (config_l); config_l.put ("address", telemetry_metrics.first.address ()); config_l.put ("port", telemetry_metrics.first.port ()); if (!err) @@ -4041,15 +4038,14 @@ void nano::json_handler::telemetry () else { nano::jsonconfig config_l; - std::vector telemetry_data_time_pairs; - telemetry_data_time_pairs.reserve (batched_telemetry_metrics_a.telemetry_data_time_pairs.size ()); - std::transform (batched_telemetry_metrics_a.telemetry_data_time_pairs.begin (), batched_telemetry_metrics_a.telemetry_data_time_pairs.end (), std::back_inserter (telemetry_data_time_pairs), [](auto const & telemetry_data_time_pair_a) { - return telemetry_data_time_pair_a.second; + std::vector telemetry_datas; + telemetry_datas.reserve (telemetry_responses_a.telemetry_datas.size ()); + std::transform (telemetry_responses_a.telemetry_datas.begin (), telemetry_responses_a.telemetry_datas.end (), std::back_inserter (telemetry_datas), [](auto const & endpoint_telemetry_data) { + return endpoint_telemetry_data.second; }); - auto average_telemetry_metrics = nano::consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs); - auto err = average_telemetry_metrics.data.serialize_json (config_l); - config_l.put ("timestamp", std::chrono::duration_cast (average_telemetry_metrics.system_last_updated.time_since_epoch ()).count ()); + auto average_telemetry_metrics = nano::consolidate_telemetry_data (telemetry_datas); + auto err = average_telemetry_metrics.serialize_json (config_l); auto const & ptree = config_l.get_tree (); if (!err) diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index 1554ffdd..a9e50661 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -20,7 +20,7 @@ worker (worker_a), batch_request (std::make_shared (network, alarm, worker)) { // Before callbacks are called with the batch request, check if any of the single request data can be appended to give - batch_request->pre_callback_callback = [this](std::unordered_map & data_a, std::mutex & mutex_a) { + batch_request->pre_callback_callback = [this](std::unordered_map & datas_a, std::mutex & mutex_a) { nano::lock_guard guard (this->mutex); for (auto & single_request : single_requests) { @@ -35,7 +35,7 @@ batch_request (std::make_shared (network, alarm, worker)) } else { - data_a.emplace (single_request.first, single_request.second.impl->cached_telemetry_data.begin ()->second); + datas_a.emplace (single_request.first, single_request.second.impl->cached_telemetry_data.begin ()->second.data); } } } @@ -50,7 +50,7 @@ batch_request (std::make_shared (network, alarm, worker)) } else { - data_a.emplace (pending.first, pending.second); + datas_a.emplace (pending.first, pending.second.data); } } finished_single_requests.clear (); @@ -195,7 +195,7 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptrget_network_version () >= network_params.protocol.telemetry_protocol_version_min)) { - auto add_callback_async = [& worker = this->worker, &callback_a](telemetry_data_time_pair const & telemetry_data_time_pair_a, nano::endpoint const & endpoint_a) { - telemetry_data_response telemetry_data_response_l{ telemetry_data_time_pair_a, endpoint_a, false }; + auto add_callback_async = [& worker = this->worker, &callback_a](telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a) { + telemetry_data_response telemetry_data_response_l{ telemetry_data_a, endpoint_a, false }; worker.push_task ([telemetry_data_response_l, callback_a]() { callback_a (telemetry_data_response_l); }); @@ -217,7 +217,7 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptrcached_telemetry_data.find (channel_a->get_endpoint ()); if (it != batch_request->cached_telemetry_data.cend ()) { - add_callback_async (it->second, it->first); + add_callback_async (it->second.data, it->first); return; } } @@ -225,7 +225,7 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptrget_endpoint ()); if (it != finished_single_requests.cend ()) { - add_callback_async (it->second, it->first); + add_callback_async (it->second.data, it->first); return; } @@ -238,13 +238,13 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptrsecond, it->first, error }); } else { - callback_a ({ nano::telemetry_data_time_pair{}, channel_a->get_endpoint (), error }); + callback_a ({ nano::telemetry_data{}, channel_a->get_endpoint (), error }); } }); } @@ -357,7 +357,7 @@ void nano::telemetry_impl::add (nano::telemetry_data const & telemetry_data_a, n if (!is_empty_a) { - current_telemetry_data_responses[endpoint_a] = { telemetry_data_a, std::chrono::steady_clock::now (), std::chrono::system_clock::now () }; + current_telemetry_data_responses[endpoint_a] = { telemetry_data_a, std::chrono::steady_clock::now () }; } channel_processed (lk, endpoint_a); } @@ -366,12 +366,16 @@ void nano::telemetry_impl::invoke_callbacks () { decltype (callbacks) callbacks_l; bool all_received; - decltype (cached_telemetry_data) cached_telemetry_data_l; + std::unordered_map cached_responses_l; { // Copy callbacks so that they can be called outside of holding the lock nano::lock_guard guard (mutex); callbacks_l = callbacks; - cached_telemetry_data_l = cached_telemetry_data; + cached_responses_l.reserve (cached_telemetry_data.size ()); + std::transform (cached_telemetry_data.begin (), cached_telemetry_data.end (), std::inserter (cached_responses_l, cached_responses_l.end ()), [](auto const & endpoint_telemetry_data) { + return std::pair{ endpoint_telemetry_data.first, endpoint_telemetry_data.second.data }; + }); + current_telemetry_data_responses.clear (); callbacks.clear (); all_received = failed.empty (); @@ -379,13 +383,13 @@ void nano::telemetry_impl::invoke_callbacks () if (pre_callback_callback) { - pre_callback_callback (cached_telemetry_data_l, mutex); + pre_callback_callback (cached_responses_l, mutex); } // Need to account for nodes which disable telemetry data in responses - bool all_received_l = !cached_telemetry_data_l.empty () && all_received; + bool all_received_l = !cached_responses_l.empty () && all_received; for (auto & callback : callbacks_l) { - callback ({ cached_telemetry_data_l, all_received_l }); + callback ({ cached_responses_l, all_received_l }); } } @@ -511,24 +515,14 @@ nano::telemetry_data nano::consolidate_telemetry_data (std::vector telemetry_data_time_pairs; telemetry_data_time_pairs.reserve (telemetry_datas.size ()); - std::transform (telemetry_datas.begin (), telemetry_datas.end (), std::back_inserter (telemetry_data_time_pairs), [](nano::telemetry_data const & telemetry_data_a) { - // Don't care about the timestamps here - return nano::telemetry_data_time_pair{ telemetry_data_a, {}, {} }; - }); - - return consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs).data; -} - -nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std::vector const & telemetry_data_time_pairs_a) -{ - if (telemetry_data_time_pairs_a.empty ()) + if (telemetry_datas.empty ()) { return {}; } - else if (telemetry_data_time_pairs_a.size () == 1) + else if (telemetry_datas.size () == 1) { // Only 1 element in the collection, so just return it. - return telemetry_data_time_pairs_a.front (); + return telemetry_datas.front (); } std::unordered_map protocol_versions; @@ -542,13 +536,12 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std: std::multiset cemented_counts; std::multiset peer_counts; std::multiset unchecked_counts; - std::multiset uptime_counts; - std::multiset bandwidth_counts; - std::multiset timestamp_counts; + std::multiset uptimes; + std::multiset bandwidths; + std::multiset timestamps; - for (auto const & telemetry_data_time_pair : telemetry_data_time_pairs_a) + for (auto const & telemetry_data : telemetry_datas) { - auto & telemetry_data = telemetry_data_time_pair.data; account_counts.insert (telemetry_data.account_count); block_counts.insert (telemetry_data.block_count); cemented_counts.insert (telemetry_data.cemented_count); @@ -572,25 +565,28 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std: } } + if (telemetry_data.timestamp.is_initialized ()) + { + timestamps.insert (std::chrono::duration_cast (telemetry_data.timestamp->time_since_epoch ()).count ()); + } + ++vendor_versions[ss.str ()]; ++protocol_versions[telemetry_data.protocol_version]; peer_counts.insert (telemetry_data.peer_count); unchecked_counts.insert (telemetry_data.unchecked_count); - uptime_counts.insert (telemetry_data.uptime); + uptimes.insert (telemetry_data.uptime); // 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed if (telemetry_data.bandwidth_cap != 0) { - bandwidth_counts.insert (telemetry_data.bandwidth_cap); + bandwidths.insert (telemetry_data.bandwidth_cap); } ++bandwidth_caps[telemetry_data.bandwidth_cap]; ++genesis_blocks[telemetry_data.genesis_block]; - - timestamp_counts.insert (std::chrono::time_point_cast (telemetry_data_time_pair.system_last_updated).time_since_epoch ().count ()); } // Remove 10% of the results from the lower and upper bounds to catch any outliers. Need at least 10 responses before any are removed. - auto num_either_side_to_remove = telemetry_data_time_pairs_a.size () / 10; + auto num_either_side_to_remove = telemetry_datas.size () / 10; auto strip_outliers_and_sum = [num_either_side_to_remove](auto & counts) { counts.erase (counts.begin (), std::next (counts.begin (), num_either_side_to_remove)); @@ -605,11 +601,11 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std: auto cemented_sum = strip_outliers_and_sum (cemented_counts); auto peer_sum = strip_outliers_and_sum (peer_counts); auto unchecked_sum = strip_outliers_and_sum (unchecked_counts); - auto uptime_sum = strip_outliers_and_sum (uptime_counts); - auto bandwidth_sum = strip_outliers_and_sum (bandwidth_counts); + auto uptime_sum = strip_outliers_and_sum (uptimes); + auto bandwidth_sum = strip_outliers_and_sum (bandwidths); nano::telemetry_data consolidated_data; - auto size = telemetry_data_time_pairs_a.size () - num_either_side_to_remove * 2; + auto size = telemetry_datas.size () - num_either_side_to_remove * 2; consolidated_data.account_count = boost::numeric_cast (account_sum / size); consolidated_data.block_count = boost::numeric_cast (block_sum / size); consolidated_data.cemented_count = boost::numeric_cast (cemented_sum / size); @@ -617,6 +613,12 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std: consolidated_data.uptime = boost::numeric_cast (uptime_sum / size); consolidated_data.unchecked_count = boost::numeric_cast (unchecked_sum / size); + if (!timestamps.empty ()) + { + auto timestamp_sum = strip_outliers_and_sum (timestamps); + consolidated_data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (boost::numeric_cast (timestamp_sum / timestamps.size ()))); + } + auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) { auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) { return lhs.second < rhs.second; @@ -668,11 +670,7 @@ nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std: consolidated_data.maker = boost::lexical_cast (version_fragments[4]); } - // Consolidate timestamps - auto timestamp_sum = strip_outliers_and_sum (timestamp_counts); - auto consolidated_timestamp = boost::numeric_cast (timestamp_sum / size); - - return telemetry_data_time_pair{ consolidated_data, std::chrono::steady_clock::time_point{}, std::chrono::system_clock::time_point (std::chrono::milliseconds (consolidated_timestamp)) }; + return consolidated_data; } nano::telemetry_data nano::local_telemetry_data (nano::ledger_cache const & ledger_cache_a, nano::network & network_a, uint64_t bandwidth_limit_a, nano::network_params const & network_params_a, std::chrono::steady_clock::time_point statup_time_a) @@ -692,5 +690,6 @@ nano::telemetry_data nano::local_telemetry_data (nano::ledger_cache const & ledg telemetry_data.patch_version = nano::get_patch_node_version (); telemetry_data.pre_release_version = nano::get_pre_release_node_version (); telemetry_data.maker = 0; // 0 Indicates it originated from the NF + telemetry_data.timestamp = std::chrono::system_clock::now (); return telemetry_data; } diff --git a/nano/node/telemetry.hpp b/nano/node/telemetry.hpp index 093c996b..4406e7cd 100644 --- a/nano/node/telemetry.hpp +++ b/nano/node/telemetry.hpp @@ -23,7 +23,6 @@ class telemetry_data_time_pair public: nano::telemetry_data data; std::chrono::steady_clock::time_point last_updated; - std::chrono::system_clock::time_point system_last_updated; bool operator== (telemetry_data_time_pair const &) const; bool operator!= (telemetry_data_time_pair const &) const; }; @@ -34,7 +33,7 @@ public: class telemetry_data_response { public: - nano::telemetry_data_time_pair telemetry_data_time_pair; + nano::telemetry_data telemetry_data; nano::endpoint endpoint; bool error{ true }; }; @@ -45,7 +44,7 @@ public: class telemetry_data_responses { public: - std::unordered_map telemetry_data_time_pairs; + std::unordered_map telemetry_datas; bool all_received{ false }; }; @@ -88,7 +87,7 @@ private: nano::alarm & alarm; nano::worker & worker; - std::function & data_a, std::mutex &)> pre_callback_callback; + std::function & data_a, std::mutex &)> pre_callback_callback; void invoke_callbacks (); void channel_processed (nano::unique_lock & lk_a, nano::endpoint const & endpoint_a); diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index e4a2eee1..29525bb7 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -78,7 +78,7 @@ namespace transport class tcp_channels final { friend class nano::transport::channel_tcp; - friend class node_telemetry_simultaneous_single_and_random_requests_Test; + friend class node_telemetry_simultaneous_single_and_all_requests_Test; public: tcp_channels (nano::node &); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 523082d6..1f0202b9 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -846,7 +846,7 @@ public: std::atomic awaiting_cache{ false }; std::atomic keep_requesting_metrics{ true }; std::shared_ptr node; - std::chrono::steady_clock::time_point orig_time; + std::chrono::system_clock::time_point orig_time; std::atomic_flag orig_time_set = ATOMIC_FLAG_INIT; }; class shared_data @@ -859,7 +859,7 @@ public: }; template -void callback_process (shared_data & shared_data_a, data & data, T & all_node_data_a, std::chrono::steady_clock::time_point last_updated) +void callback_process (shared_data & shared_data_a, data & data, T & all_node_data_a, std::chrono::system_clock::time_point last_updated) { if (!data.orig_time_set.test_and_set ()) { @@ -921,7 +921,7 @@ TEST (node_telemetry, ongoing_requests) } } -TEST (node_telemetry, simultaneous_random_requests) +TEST (node_telemetry, simultaneous_all_requests) { const auto num_nodes = 4; nano::system system (num_nodes); @@ -954,7 +954,7 @@ TEST (node_telemetry, simultaneous_random_requests) { ++shared_data.count; data.node->telemetry.get_metrics_peers_async ([&shared_data, &data, &all_data](nano::telemetry_data_responses const & responses_a) { - callback_process (shared_data, data, all_data, responses_a.telemetry_data_time_pairs.begin ()->second.last_updated); + callback_process (shared_data, data, all_data, *responses_a.telemetry_datas.begin ()->second.timestamp); }); } std::this_thread::sleep_for (1ms); @@ -982,7 +982,7 @@ namespace nano { namespace transport { - TEST (node_telemetry, simultaneous_single_and_random_requests) + TEST (node_telemetry, simultaneous_single_and_all_requests) { const auto num_nodes = 4; nano::system system (num_nodes); @@ -993,21 +993,21 @@ namespace transport const auto num_threads = 4; std::array node_data_single{}; - std::array node_data_random{}; + std::array node_data_all{}; for (auto i = 0; i < num_nodes; ++i) { node_data_single[i].node = system.nodes[i]; - node_data_random[i].node = system.nodes[i]; + node_data_all[i].node = system.nodes[i]; } shared_data shared_data_single; - shared_data shared_data_random; + shared_data shared_data_all; // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. // The test waits until all telemetry_ack messages have been received. for (int i = 0; i < num_threads; ++i) { - threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() { + threads.emplace_back ([&node_data_single, &node_data_all, &shared_data_single, &shared_data_all]() { auto func = [](auto & all_node_data_a, shared_data & shared_data_a, bool single_a) { while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) { @@ -1023,13 +1023,13 @@ namespace transport // Pick first peer to be consistent auto peer = data.node->network.tcp_channels.channels[0].channel; data.node->telemetry.get_metrics_single_peer_async (peer, [&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_response const & telemetry_data_response_a) { - callback_process (shared_data_a, data, all_node_data_a, telemetry_data_response_a.telemetry_data_time_pair.last_updated); + callback_process (shared_data_a, data, all_node_data_a, *telemetry_data_response_a.telemetry_data.timestamp); }); } else { data.node->telemetry.get_metrics_peers_async ([&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_responses const & telemetry_data_responses_a) { - callback_process (shared_data_a, data, all_node_data_a, telemetry_data_responses_a.telemetry_data_time_pairs.begin ()->second.last_updated); + callback_process (shared_data_a, data, all_node_data_a, *telemetry_data_responses_a.telemetry_datas.begin ()->second.timestamp); }); } } @@ -1042,12 +1042,12 @@ namespace transport }; func (node_data_single, shared_data_single, true); - func (node_data_random, shared_data_random, false); + func (node_data_all, shared_data_all, false); }); } system.deadline_set (30s); - while (!shared_data_random.done || !shared_data_single.done) + while (!shared_data_all.done || !shared_data_single.done) { ASSERT_NO_ERROR (system.poll ()); }