Remove telemetry data aggregation (#4684)

This commit is contained in:
Piotr Wójcik 2024-07-19 09:29:11 +02:00 committed by GitHub
commit d6f140b2e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 9 additions and 411 deletions

View file

@ -10,230 +10,6 @@
using namespace std::chrono_literals;
TEST (telemetry, consolidate_data)
{
auto time = 1582117035109;
// Pick specific values so that we can check both mode and average are working correctly
nano::telemetry_data data;
data.account_count = 2;
data.block_count = 1;
data.cemented_count = 1;
data.protocol_version = 12;
data.peer_count = 2;
data.bandwidth_cap = 100;
data.unchecked_count = 3;
data.uptime = 6;
data.genesis_block = nano::block_hash (3);
data.major_version = 20;
data.minor_version = 1;
data.patch_version = 4;
data.pre_release_version = 6;
data.maker = 2;
data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time));
data.active_difficulty = 2;
nano::telemetry_data data1;
data1.account_count = 5;
data1.block_count = 7;
data1.cemented_count = 4;
data1.protocol_version = 11;
data1.peer_count = 5;
data1.bandwidth_cap = 0;
data1.unchecked_count = 1;
data1.uptime = 10;
data1.genesis_block = nano::block_hash (4);
data1.major_version = 10;
data1.minor_version = 2;
data1.patch_version = 3;
data1.pre_release_version = 6;
data1.maker = 2;
data1.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time + 1));
data1.active_difficulty = 3;
nano::telemetry_data data2;
data2.account_count = 3;
data2.block_count = 3;
data2.cemented_count = 2;
data2.protocol_version = 11;
data2.peer_count = 4;
data2.bandwidth_cap = 0;
data2.unchecked_count = 2;
data2.uptime = 3;
data2.genesis_block = nano::block_hash (4);
data2.major_version = 20;
data2.minor_version = 1;
data2.patch_version = 4;
data2.pre_release_version = 6;
data2.maker = 2;
data2.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (time));
data2.active_difficulty = 2;
std::vector<nano::telemetry_data> all_data{ data, data1, data2 };
auto consolidated_telemetry_data = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (consolidated_telemetry_data.account_count, 3);
ASSERT_EQ (consolidated_telemetry_data.block_count, 3);
ASSERT_EQ (consolidated_telemetry_data.cemented_count, 2);
ASSERT_EQ (consolidated_telemetry_data.protocol_version, 11);
ASSERT_EQ (consolidated_telemetry_data.peer_count, 3);
ASSERT_EQ (consolidated_telemetry_data.bandwidth_cap, 0);
ASSERT_EQ (consolidated_telemetry_data.unchecked_count, 2);
ASSERT_EQ (consolidated_telemetry_data.uptime, 6);
ASSERT_EQ (consolidated_telemetry_data.genesis_block, nano::block_hash (4));
ASSERT_EQ (consolidated_telemetry_data.major_version, 20);
ASSERT_EQ (consolidated_telemetry_data.minor_version, 1);
ASSERT_EQ (consolidated_telemetry_data.patch_version, 4);
ASSERT_EQ (consolidated_telemetry_data.pre_release_version, 6);
ASSERT_EQ (consolidated_telemetry_data.maker, 2);
ASSERT_EQ (consolidated_telemetry_data.timestamp, std::chrono::system_clock::time_point (std::chrono::milliseconds (time)));
ASSERT_EQ (consolidated_telemetry_data.active_difficulty, 2);
// Modify the metrics which may be either the mode or averages to ensure all are tested.
all_data[2].bandwidth_cap = 53;
all_data[2].protocol_version = 13;
all_data[2].genesis_block = nano::block_hash (3);
all_data[2].major_version = 10;
all_data[2].minor_version = 2;
all_data[2].patch_version = 3;
all_data[2].pre_release_version = 6;
all_data[2].maker = 2;
auto consolidated_telemetry_data1 = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (consolidated_telemetry_data1.major_version, 10);
ASSERT_EQ (consolidated_telemetry_data1.minor_version, 2);
ASSERT_EQ (consolidated_telemetry_data1.patch_version, 3);
ASSERT_EQ (consolidated_telemetry_data1.pre_release_version, 6);
ASSERT_EQ (consolidated_telemetry_data1.maker, 2);
ASSERT_TRUE (consolidated_telemetry_data1.protocol_version == 11 || consolidated_telemetry_data1.protocol_version == 12 || consolidated_telemetry_data1.protocol_version == 13);
ASSERT_EQ (consolidated_telemetry_data1.bandwidth_cap, 51);
ASSERT_EQ (consolidated_telemetry_data1.genesis_block, nano::block_hash (3));
// Test equality operator
ASSERT_FALSE (consolidated_telemetry_data == consolidated_telemetry_data1);
ASSERT_EQ (consolidated_telemetry_data, consolidated_telemetry_data);
}
TEST (telemetry, consolidate_data_remove_outliers)
{
nano::telemetry_data data;
data.account_count = 2;
data.block_count = 1;
data.cemented_count = 1;
data.protocol_version = 12;
data.peer_count = 2;
data.bandwidth_cap = 100;
data.unchecked_count = 3;
data.uptime = 6;
data.genesis_block = nano::block_hash (3);
data.major_version = 20;
data.minor_version = 1;
data.patch_version = 5;
data.pre_release_version = 2;
data.maker = 1;
data.timestamp = std::chrono::system_clock::time_point (100ms);
data.active_difficulty = 10;
// Insert 20 of these, and 2 outliers at the lower and upper bounds which should get removed
std::vector<nano::telemetry_data> all_data (20, data);
// Insert some outliers
nano::telemetry_data lower_bound_outlier_data;
lower_bound_outlier_data.account_count = 1;
lower_bound_outlier_data.block_count = 0;
lower_bound_outlier_data.cemented_count = 0;
lower_bound_outlier_data.protocol_version = 11;
lower_bound_outlier_data.peer_count = 0;
lower_bound_outlier_data.bandwidth_cap = 8;
lower_bound_outlier_data.unchecked_count = 1;
lower_bound_outlier_data.uptime = 2;
lower_bound_outlier_data.genesis_block = nano::block_hash (2);
lower_bound_outlier_data.major_version = 11;
lower_bound_outlier_data.minor_version = 1;
lower_bound_outlier_data.patch_version = 1;
lower_bound_outlier_data.pre_release_version = 1;
lower_bound_outlier_data.maker = 1;
lower_bound_outlier_data.timestamp = std::chrono::system_clock::time_point (1ms);
lower_bound_outlier_data.active_difficulty = 1;
all_data.push_back (lower_bound_outlier_data);
all_data.push_back (lower_bound_outlier_data);
nano::telemetry_data upper_bound_outlier_data;
upper_bound_outlier_data.account_count = 99;
upper_bound_outlier_data.block_count = 99;
upper_bound_outlier_data.cemented_count = 99;
upper_bound_outlier_data.protocol_version = 99;
upper_bound_outlier_data.peer_count = 99;
upper_bound_outlier_data.bandwidth_cap = 999;
upper_bound_outlier_data.unchecked_count = 99;
upper_bound_outlier_data.uptime = 999;
upper_bound_outlier_data.genesis_block = nano::block_hash (99);
upper_bound_outlier_data.major_version = 99;
upper_bound_outlier_data.minor_version = 9;
upper_bound_outlier_data.patch_version = 9;
upper_bound_outlier_data.pre_release_version = 9;
upper_bound_outlier_data.maker = 9;
upper_bound_outlier_data.timestamp = std::chrono::system_clock::time_point (999ms);
upper_bound_outlier_data.active_difficulty = 99;
all_data.push_back (upper_bound_outlier_data);
all_data.push_back (upper_bound_outlier_data);
auto consolidated_telemetry_data = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (data, consolidated_telemetry_data);
}
TEST (telemetry, consolidate_data_remove_outliers_with_zero_bandwidth)
{
nano::telemetry_data data1;
data1.account_count = 2;
data1.block_count = 1;
data1.cemented_count = 1;
data1.protocol_version = 12;
data1.peer_count = 2;
data1.bandwidth_cap = 0;
data1.unchecked_count = 3;
data1.uptime = 6;
data1.genesis_block = nano::block_hash (3);
data1.major_version = 20;
data1.minor_version = 1;
data1.patch_version = 5;
data1.pre_release_version = 2;
data1.maker = 1;
data1.timestamp = std::chrono::system_clock::time_point (100ms);
data1.active_difficulty = 10;
// Add a majority of nodes with bandwidth set to 0
std::vector<nano::telemetry_data> all_data (100, data1);
nano::telemetry_data data2;
data2.account_count = 2;
data2.block_count = 1;
data2.cemented_count = 1;
data2.protocol_version = 12;
data2.peer_count = 2;
data2.bandwidth_cap = 100;
data2.unchecked_count = 3;
data2.uptime = 6;
data2.genesis_block = nano::block_hash (3);
data2.major_version = 20;
data2.minor_version = 1;
data2.patch_version = 5;
data2.pre_release_version = 2;
data2.maker = 1;
data2.timestamp = std::chrono::system_clock::time_point (100ms);
data2.active_difficulty = 10;
auto consolidated_telemetry_data1 = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (consolidated_telemetry_data1.bandwidth_cap, 0);
// And a few nodes with non-zero bandwidth
all_data.push_back (data2);
all_data.push_back (data2);
auto consolidated_telemetry_data2 = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (consolidated_telemetry_data2.bandwidth_cap, 0);
}
TEST (telemetry, signatures)
{
nano::keypair node_id;
@ -512,27 +288,3 @@ TEST (telemetry, ongoing_broadcasts)
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) >= 3);
ASSERT_TIMELY (5s, node2.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) >= 3)
}
// TODO: With handshake V2, nodes with mismatched genesis will refuse to connect while setting up the system
TEST (telemetry, DISABLED_mismatched_genesis)
{
// Only second node will broadcast telemetry
nano::test::system system;
nano::node_flags node_flags;
node_flags.disable_ongoing_telemetry_requests = true;
node_flags.disable_providing_telemetry_metrics = true;
auto & node1 = *system.add_node (node_flags);
// Set up a node with different genesis
nano::network_params network_params{ nano::networks::nano_dev_network };
network_params.ledger.genesis = network_params.ledger.nano_live_genesis;
nano::node_config node_config{ network_params };
node_flags.disable_providing_telemetry_metrics = false;
auto & node2 = *system.add_node (node_config, node_flags);
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::telemetry, nano::stat::detail::genesis_mismatch) > 0);
ASSERT_ALWAYS (1s, node1.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) == 0)
// Ensure node with different genesis gets disconnected
ASSERT_TIMELY (5s, !node1.network.find_node_id (node2.get_node_id ()));
}

View file

@ -4145,27 +4145,21 @@ void nano::json_handler::telemetry ()
}
else
{
nano::jsonconfig config_l;
std::vector<nano::telemetry_data> telemetry_datas;
telemetry_datas.reserve (telemetry_responses.size ());
std::transform (telemetry_responses.begin (), telemetry_responses.end (), std::back_inserter (telemetry_datas), [] (auto const & endpoint_telemetry_data) {
return endpoint_telemetry_data.second;
});
// Default case without any parameters, requesting telemetry metrics locally
auto telemetry_data = node.local_telemetry ();
auto average_telemetry_metrics = nano::consolidate_telemetry_data (telemetry_datas);
// Don't add node_id/signature in consolidated metrics
auto const should_ignore_identification_metrics = true;
auto err = average_telemetry_metrics.serialize_json (config_l, should_ignore_identification_metrics);
nano::jsonconfig config_l;
auto const should_ignore_identification_metrics = false;
auto err = telemetry_data.serialize_json (config_l, should_ignore_identification_metrics);
auto const & ptree = config_l.get_tree ();
if (!err)
{
response_l.insert (response_l.begin (), ptree.begin (), ptree.end ());
}
else
{
ec = nano::error_rpc::generic;
}
response_errors ();
return;
}
response_errors ();

View file

@ -296,148 +296,4 @@ std::unique_ptr<nano::container_info_component> nano::telemetry::collect_contain
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "telemetries", telemetries.size (), sizeof (decltype (telemetries)::value_type) }));
return composite;
}
nano::telemetry_data nano::consolidate_telemetry_data (std::vector<nano::telemetry_data> const & telemetry_datas)
{
if (telemetry_datas.empty ())
{
return {};
}
else if (telemetry_datas.size () == 1)
{
// Only 1 element in the collection, so just return it.
return telemetry_datas.front ();
}
std::unordered_map<uint8_t, int> protocol_versions;
std::unordered_map<std::string, int> vendor_versions;
std::unordered_map<uint64_t, int> bandwidth_caps;
std::unordered_map<nano::block_hash, int> genesis_blocks;
// Use a trimmed average which excludes the upper and lower 10% of the results
std::multiset<uint64_t> account_counts;
std::multiset<uint64_t> block_counts;
std::multiset<uint64_t> cemented_counts;
std::multiset<uint32_t> peer_counts;
std::multiset<uint64_t> unchecked_counts;
std::multiset<uint64_t> uptimes;
std::multiset<uint64_t> bandwidths;
std::multiset<uint64_t> timestamps;
std::multiset<uint64_t> active_difficulties;
for (auto const & telemetry_data : telemetry_datas)
{
account_counts.insert (telemetry_data.account_count);
block_counts.insert (telemetry_data.block_count);
cemented_counts.insert (telemetry_data.cemented_count);
std::ostringstream ss;
ss << telemetry_data.major_version << "." << telemetry_data.minor_version << "." << telemetry_data.patch_version << "." << telemetry_data.pre_release_version << "." << telemetry_data.maker;
++vendor_versions[ss.str ()];
timestamps.insert (std::chrono::duration_cast<std::chrono::milliseconds> (telemetry_data.timestamp.time_since_epoch ()).count ());
++protocol_versions[telemetry_data.protocol_version];
peer_counts.insert (telemetry_data.peer_count);
unchecked_counts.insert (telemetry_data.unchecked_count);
uptimes.insert (telemetry_data.uptime);
// 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed
if (telemetry_data.bandwidth_cap != 0)
{
bandwidths.insert (telemetry_data.bandwidth_cap);
}
++bandwidth_caps[telemetry_data.bandwidth_cap];
++genesis_blocks[telemetry_data.genesis_block];
active_difficulties.insert (telemetry_data.active_difficulty);
}
// Remove 10% of the results from the lower and upper bounds to catch any outliers. Need at least 10 responses before any are removed.
auto num_either_side_to_remove = telemetry_datas.size () / 10;
auto strip_outliers_and_sum = [num_either_side_to_remove] (auto & counts) {
if (num_either_side_to_remove * 2 >= counts.size ())
{
return nano::uint128_t (0);
}
counts.erase (counts.begin (), std::next (counts.begin (), num_either_side_to_remove));
counts.erase (std::next (counts.rbegin (), num_either_side_to_remove).base (), counts.end ());
return std::accumulate (counts.begin (), counts.end (), nano::uint128_t (0), [] (nano::uint128_t total, auto count) {
return total += count;
});
};
auto account_sum = strip_outliers_and_sum (account_counts);
auto block_sum = strip_outliers_and_sum (block_counts);
auto cemented_sum = strip_outliers_and_sum (cemented_counts);
auto peer_sum = strip_outliers_and_sum (peer_counts);
auto unchecked_sum = strip_outliers_and_sum (unchecked_counts);
auto uptime_sum = strip_outliers_and_sum (uptimes);
auto bandwidth_sum = strip_outliers_and_sum (bandwidths);
auto active_difficulty_sum = strip_outliers_and_sum (active_difficulties);
nano::telemetry_data consolidated_data;
auto size = telemetry_datas.size () - num_either_side_to_remove * 2;
consolidated_data.account_count = boost::numeric_cast<decltype (consolidated_data.account_count)> (account_sum / size);
consolidated_data.block_count = boost::numeric_cast<decltype (consolidated_data.block_count)> (block_sum / size);
consolidated_data.cemented_count = boost::numeric_cast<decltype (consolidated_data.cemented_count)> (cemented_sum / size);
consolidated_data.peer_count = boost::numeric_cast<decltype (consolidated_data.peer_count)> (peer_sum / size);
consolidated_data.uptime = boost::numeric_cast<decltype (consolidated_data.uptime)> (uptime_sum / size);
consolidated_data.unchecked_count = boost::numeric_cast<decltype (consolidated_data.unchecked_count)> (unchecked_sum / size);
consolidated_data.active_difficulty = boost::numeric_cast<decltype (consolidated_data.unchecked_count)> (active_difficulty_sum / size);
if (!timestamps.empty ())
{
auto timestamp_sum = strip_outliers_and_sum (timestamps);
consolidated_data.timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (boost::numeric_cast<uint64_t> (timestamp_sum / timestamps.size ())));
}
auto set_mode_or_average = [] (auto const & collection, auto & var, auto const & sum, std::size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [] (auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
var = (sum / size).template convert_to<std::remove_reference_t<decltype (var)>> ();
}
};
auto set_mode = [] (auto const & collection, auto & var, std::size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [] (auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
// Just pick the first one
var = collection.begin ()->first;
}
};
// Use the mode of protocol version and vendor version. Also use it for bandwidth cap if there is 2 or more of the same cap.
set_mode_or_average (bandwidth_caps, consolidated_data.bandwidth_cap, bandwidth_sum, size);
set_mode (protocol_versions, consolidated_data.protocol_version, size);
set_mode (genesis_blocks, consolidated_data.genesis_block, size);
// Vendor version, needs to be parsed out of the string
std::string version;
set_mode (vendor_versions, version, size);
// May only have major version, but check for optional parameters as well, only output if all are used
std::vector<std::string> version_fragments;
boost::split (version_fragments, version, boost::is_any_of ("."));
debug_assert (version_fragments.size () == 5);
consolidated_data.major_version = boost::lexical_cast<uint8_t> (version_fragments.front ());
consolidated_data.minor_version = boost::lexical_cast<uint8_t> (version_fragments[1]);
consolidated_data.patch_version = boost::lexical_cast<uint8_t> (version_fragments[2]);
consolidated_data.pre_release_version = boost::lexical_cast<uint8_t> (version_fragments[3]);
consolidated_data.maker = boost::lexical_cast<uint8_t> (version_fragments[4]);
return consolidated_data;
}
}

View file

@ -148,6 +148,4 @@ private:
private:
static std::size_t constexpr max_size = 1024;
};
nano::telemetry_data consolidate_telemetry_data (std::vector<telemetry_data> const & telemetry_data);
}

View file

@ -6776,8 +6776,6 @@ TEST (rpc, telemetry_all)
auto const should_ignore_identification_metrics = true;
ASSERT_FALSE (telemetry_data.deserialize_json (config, should_ignore_identification_metrics));
ASSERT_TRUE (nano::test::compare_telemetry_data (telemetry_data, node->local_telemetry ()));
ASSERT_FALSE (response.get_optional<std::string> ("node_id").is_initialized ());
ASSERT_FALSE (response.get_optional<std::string> ("signature").is_initialized ());
}
request.put ("raw", "true");