Poll all nodes, remove outliers, ddos protection & amend RPC response with endpoint (#2521)

* Poll all nodes and remove some metrics from bounds when consolidating

* Update out of date comment

* Formatting

* Fix clang build on actions with long std::tuple

* Allow square brackets in ipv6 address in RPC

* Merge with develop

* Fix ASAN issue

* Gui review comments

* Make parse_address accept v4 and v6 ip addresses

* Incorrect order of arguments

* Use new cached genesis hash

* Move last_telemetry_req to bootstrap_server
This commit is contained in:
Wesley Shillingford 2020-02-10 15:39:53 +00:00 committed by GitHub
commit bd93581a9b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 1189 additions and 534 deletions

View file

@ -65,7 +65,7 @@ TEST (node_telemetry, consolidate_data)
std::vector<nano::telemetry_data> all_data{ data, data1, data2 };
auto consolidated_telemetry_data = nano::telemetry_data::consolidate (all_data);
auto consolidated_telemetry_data = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (consolidated_telemetry_data.account_count, 3);
ASSERT_EQ (consolidated_telemetry_data.block_count, 3);
ASSERT_EQ (consolidated_telemetry_data.cemented_count, 2);
@ -91,7 +91,7 @@ TEST (node_telemetry, consolidate_data)
all_data[2].pre_release_version = 6;
all_data[2].maker = 2;
auto consolidated_telemetry_data1 = nano::telemetry_data::consolidate (all_data);
auto consolidated_telemetry_data1 = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (consolidated_telemetry_data1.major_version, 10);
ASSERT_EQ (*consolidated_telemetry_data1.minor_version, 2);
ASSERT_EQ (*consolidated_telemetry_data1.patch_version, 3);
@ -122,7 +122,7 @@ TEST (node_telemetry, consolidate_data_optional_data)
nano::telemetry_data missing_all_optional;
std::vector<nano::telemetry_data> all_data{ data, data, missing_minor, missing_all_optional };
auto consolidated_telemetry_data = nano::telemetry_data::consolidate (all_data);
auto consolidated_telemetry_data = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (consolidated_telemetry_data.major_version, 20);
ASSERT_EQ (*consolidated_telemetry_data.minor_version, 1);
ASSERT_EQ (*consolidated_telemetry_data.patch_version, 4);
@ -174,15 +174,76 @@ TEST (node_telemetry, serialize_deserialize_json_optional)
ASSERT_FALSE (no_optional_data1.maker.is_initialized ());
}
TEST (node_telemetry, consolidate_data_remove_outliers)
{
nano::telemetry_data data;
data.account_count = 2;
data.block_count = 1;
data.cemented_count = 1;
data.protocol_version = 12;
data.peer_count = 2;
data.bandwidth_cap = 100;
data.unchecked_count = 3;
data.uptime = 6;
data.genesis_block = nano::block_hash (3);
data.major_version = 20;
data.minor_version = 1;
data.patch_version = 5;
data.pre_release_version = 2;
data.maker = 1;
// Insert 20 of these, and 2 outliers at the lower and upper bounds which should get removed
std::vector<nano::telemetry_data> all_data (20, data);
// Insert some outliers
nano::telemetry_data outlier_data;
outlier_data.account_count = 1;
outlier_data.block_count = 0;
outlier_data.cemented_count = 0;
outlier_data.protocol_version = 11;
outlier_data.peer_count = 0;
outlier_data.bandwidth_cap = 8;
outlier_data.unchecked_count = 1;
outlier_data.uptime = 2;
outlier_data.genesis_block = nano::block_hash (2);
outlier_data.major_version = 11;
outlier_data.minor_version = 1;
outlier_data.patch_version = 1;
outlier_data.pre_release_version = 1;
outlier_data.maker = 1;
all_data.push_back (outlier_data);
all_data.push_back (outlier_data);
nano::telemetry_data outlier_data1;
outlier_data1.account_count = 99;
outlier_data1.block_count = 99;
outlier_data1.cemented_count = 99;
outlier_data1.protocol_version = 99;
outlier_data1.peer_count = 99;
outlier_data1.bandwidth_cap = 999;
outlier_data1.unchecked_count = 99;
outlier_data1.uptime = 999;
outlier_data1.genesis_block = nano::block_hash (99);
outlier_data1.major_version = 99;
outlier_data1.minor_version = 9;
outlier_data1.patch_version = 9;
outlier_data1.pre_release_version = 9;
outlier_data1.maker = 9;
all_data.push_back (outlier_data1);
all_data.push_back (outlier_data1);
auto consolidated_telemetry_data = nano::consolidate_telemetry_data (all_data);
ASSERT_EQ (data, consolidated_telemetry_data);
}
TEST (node_telemetry, no_peers)
{
nano::system system (1);
std::atomic<bool> done{ false };
system.nodes[0]->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.data.empty ());
system.nodes[0]->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.telemetry_data_time_pairs.empty ());
ASSERT_FALSE (responses_a.all_received);
ASSERT_FALSE (responses_a.is_cached);
done = true;
});
@ -205,13 +266,12 @@ TEST (node_telemetry, basic)
wait_peer_connections (system);
// Request telemetry metrics
std::vector<nano::telemetry_data> all_telemetry_data;
std::unordered_map<nano::endpoint, nano::telemetry_data_time_pair> all_telemetry_data_time_pairs;
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) {
ASSERT_FALSE (responses_a.is_cached);
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
all_telemetry_data = responses_a.data;
all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs;
done = true;
});
@ -223,16 +283,14 @@ TEST (node_telemetry, basic)
}
// Check the metrics are correct
ASSERT_EQ (all_telemetry_data.size (), 1);
auto & telemetry_data = all_telemetry_data.front ();
compare_default_test_result_data (telemetry_data, *node_server);
ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1);
compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server);
// Call again straight away. It should use the cache
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) {
ASSERT_EQ (telemetry_data, responses_a.data.front ());
ASSERT_TRUE (responses_a.is_cached);
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
ASSERT_EQ (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs);
ASSERT_TRUE (responses_a.all_received);
done = true;
});
@ -245,11 +303,13 @@ TEST (node_telemetry, basic)
}
// Wait the cache period and check cache is not used
std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff);
std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
// Arbitrarily change something so that we can confirm different metrics were used
node_server->ledger.cache.block_count = 100;
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) {
ASSERT_FALSE (responses_a.is_cached);
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
ASSERT_NE (all_telemetry_data_time_pairs, responses_a.telemetry_data_time_pairs);
ASSERT_TRUE (responses_a.all_received);
done = true;
});
@ -270,7 +330,7 @@ TEST (node_telemetry, many_nodes)
for (auto i = 0; i < num_nodes; ++i)
{
nano::node_config node_config (nano::get_available_port (), system.logging);
// Make a metric completely different for each node so we can get afterwards that there are no duplicates
// Make a metric completely different for each node so we can check afterwards that there are no duplicates
node_config.bandwidth_limit = 100000 + i;
system.add_node (node_config);
}
@ -291,11 +351,10 @@ TEST (node_telemetry, many_nodes)
auto node_client = system.nodes.front ();
std::atomic<bool> done{ false };
std::vector<nano::telemetry_data> all_telemetry_data;
node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) {
ASSERT_FALSE (responses_a.is_cached);
std::unordered_map<nano::endpoint, nano::telemetry_data_time_pair> all_telemetry_data_time_pairs;
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
all_telemetry_data = responses_a.data;
all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs;
done = true;
});
@ -307,8 +366,9 @@ TEST (node_telemetry, many_nodes)
// Check the metrics
nano::network_params params;
for (auto & data : all_telemetry_data)
for (auto & telemetry_data_time_pair : all_telemetry_data_time_pairs)
{
auto & data = telemetry_data_time_pair.second.data;
ASSERT_EQ (data.unchecked_count, 0);
ASSERT_EQ (data.cemented_count, 1);
ASSERT_LE (data.peer_count, 9);
@ -326,9 +386,11 @@ TEST (node_telemetry, many_nodes)
ASSERT_EQ (data.genesis_block, genesis.hash ());
}
// We gave some nodes different bandwidth caps, confirm they are not all the time
auto all_bandwidth_limits_same = std::all_of (all_telemetry_data.begin () + 1, all_telemetry_data.end (), [bandwidth_cap = all_telemetry_data[0].bandwidth_cap](auto & telemetry) {
return telemetry.bandwidth_cap == bandwidth_cap;
// We gave some nodes different bandwidth caps, confirm they are not all the same
auto bandwidth_cap = all_telemetry_data_time_pairs.begin ()->second.data.bandwidth_cap;
all_telemetry_data_time_pairs.erase (all_telemetry_data_time_pairs.begin ());
auto all_bandwidth_limits_same = std::all_of (all_telemetry_data_time_pairs.begin (), all_telemetry_data_time_pairs.end (), [bandwidth_cap](auto & telemetry_data_time_pair) {
return telemetry_data_time_pair.second.data.bandwidth_cap == bandwidth_cap;
});
ASSERT_FALSE (all_bandwidth_limits_same);
}
@ -354,11 +416,10 @@ TEST (node_telemetry, over_udp)
wait_peer_connections (system);
std::atomic<bool> done{ false };
std::vector<nano::telemetry_data> all_telemetry_data;
node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) {
ASSERT_FALSE (responses_a.is_cached);
std::unordered_map<nano::endpoint, nano::telemetry_data_time_pair> all_telemetry_data_time_pairs;
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
all_telemetry_data = responses_a.data;
all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs;
done = true;
});
@ -368,8 +429,8 @@ TEST (node_telemetry, over_udp)
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (all_telemetry_data.size (), 1);
compare_default_test_result_data (all_telemetry_data.front (), *node_server);
ASSERT_EQ (all_telemetry_data_time_pairs.size (), 1);
compare_default_test_result_data (all_telemetry_data_time_pairs.begin ()->second.data, *node_server);
// Check channels are indeed udp
ASSERT_EQ (1, node_client->network.size ());
@ -382,87 +443,6 @@ TEST (node_telemetry, over_udp)
ASSERT_EQ (nano::transport::transport_type::udp, list2[0]->get_type ());
}
TEST (node_telemetry, simultaneous_random_requests)
{
const auto num_nodes = 4;
nano::system system (num_nodes);
// Wait until peers are stored as they are done in the background
wait_peer_connections (system);
std::vector<std::thread> threads;
const auto num_threads = 4;
std::atomic<bool> done{ false };
class Data
{
public:
std::atomic<bool> awaiting_cache{ false };
std::atomic<bool> keep_requesting_metrics{ true };
std::shared_ptr<nano::node> node;
};
std::array<Data, num_nodes> all_data{};
for (auto i = 0; i < num_nodes; ++i)
{
all_data[i].node = system.nodes[i];
}
std::atomic<uint64_t> count{ 0 };
std::promise<void> promise;
std::shared_future<void> shared_future (promise.get_future ());
// Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired.
// The test waits until all telemetry_ack messages have been received.
for (int i = 0; i < num_threads; ++i)
{
threads.emplace_back ([&all_data, &done, &count, &promise, &shared_future]() {
while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
{
for (auto & data : all_data)
{
// Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node
if (data.keep_requesting_metrics)
{
++count;
data.node->telemetry.get_metrics_random_peers_async ([&promise, &done, &data, &all_data, &count](nano::telemetry_data_responses const & responses_a) {
if (data.awaiting_cache && !responses_a.is_cached)
{
data.keep_requesting_metrics = false;
}
if (responses_a.is_cached)
{
data.awaiting_cache = true;
}
if (--count == 0 && std::all_of (all_data.begin (), all_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; }))
{
done = true;
promise.set_value ();
}
});
}
std::this_thread::sleep_for (1ms);
}
}
ASSERT_EQ (count, 0);
shared_future.wait ();
});
}
system.deadline_set (20s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
for (auto & thread : threads)
{
thread.join ();
}
}
namespace nano
{
TEST (node_telemetry, single_request)
@ -476,14 +456,14 @@ TEST (node_telemetry, single_request)
// Request telemetry metrics
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
nano::telemetry_data telemetry_data;
nano::telemetry_data_time_pair telemetry_data_time_pair;
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.is_cached);
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair, &channel](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
telemetry_data = response_a.data;
ASSERT_EQ (channel->get_endpoint (), response_a.endpoint);
telemetry_data_time_pair = response_a.telemetry_data_time_pair;
done = true;
});
@ -495,14 +475,13 @@ TEST (node_telemetry, single_request)
}
// Check the metrics are correct
compare_default_test_result_data (telemetry_data, *node_server);
compare_default_test_result_data (telemetry_data_time_pair.data, *node_server);
// Call again straight away. It should use the cache
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (telemetry_data, response_a.data);
ASSERT_TRUE (response_a.is_cached);
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (telemetry_data_time_pair, response_a.telemetry_data_time_pair);
ASSERT_FALSE (response_a.error);
done = true;
});
@ -515,11 +494,11 @@ TEST (node_telemetry, single_request)
}
// Wait the cache period and check cache is not used
std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff);
std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.is_cached);
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) {
ASSERT_NE (telemetry_data_time_pair, response_a.telemetry_data_time_pair);
ASSERT_FALSE (response_a.error);
done = true;
});
@ -552,100 +531,6 @@ TEST (node_telemetry, single_request_invalid_channel)
}
}
TEST (node_telemetry, simultaneous_single_and_random_requests)
{
const auto num_nodes = 4;
nano::system system (num_nodes);
wait_peer_connections (system);
std::vector<std::thread> threads;
const auto num_threads = 4;
class data
{
public:
std::atomic<bool> awaiting_cache{ false };
std::atomic<bool> keep_requesting_metrics{ true };
std::shared_ptr<nano::node> node;
};
std::array<data, num_nodes> node_data_single{};
std::array<data, num_nodes> node_data_random{};
for (auto i = 0; i < num_nodes; ++i)
{
node_data_single[i].node = system.nodes[i];
node_data_random[i].node = system.nodes[i];
}
class shared_data
{
public:
std::atomic<bool> done{ false };
std::atomic<uint64_t> count{ 0 };
std::promise<void> promise;
std::shared_future<void> shared_future{ promise.get_future () };
};
shared_data shared_data_single;
shared_data shared_data_random;
// Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired.
// The test waits until all telemetry_ack messages have been received.
for (int i = 0; i < num_threads; ++i)
{
threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() {
auto func = [](auto & all_node_data_a, shared_data & shared_data_a) {
while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
{
for (auto & data : all_node_data_a)
{
// Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node
if (data.keep_requesting_metrics)
{
++shared_data_a.count;
data.node->telemetry.get_metrics_random_peers_async ([& shared_data = shared_data_a, &data, &all_node_data = all_node_data_a](nano::telemetry_data_responses const & responses_a) {
if (data.awaiting_cache && !responses_a.is_cached)
{
data.keep_requesting_metrics = false;
}
if (responses_a.is_cached)
{
data.awaiting_cache = true;
}
if (--shared_data.count == 0 && std::all_of (all_node_data.begin (), all_node_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; }))
{
shared_data.done = true;
shared_data.promise.set_value ();
}
});
}
std::this_thread::sleep_for (1ms);
}
}
ASSERT_EQ (shared_data_a.count, 0);
shared_data_a.shared_future.wait ();
};
func (node_data_single, shared_data_single);
func (node_data_random, shared_data_random);
});
}
system.deadline_set (20s);
while (!shared_data_single.done || !shared_data_random.done)
{
ASSERT_NO_ERROR (system.poll ());
}
for (auto & thread : threads)
{
thread.join ();
}
}
TEST (node_telemetry, blocking_single_and_random)
{
nano::system system (2);
@ -676,16 +561,15 @@ TEST (node_telemetry, blocking_single_and_random)
node_client->worker.push_task (call_system_poll);
// Blocking version of get_random_metrics_async
auto telemetry_data_responses = node_client->telemetry.get_metrics_random_peers ();
ASSERT_FALSE (telemetry_data_responses.is_cached);
auto telemetry_data_responses = node_client->telemetry.get_metrics_peers ();
ASSERT_TRUE (telemetry_data_responses.all_received);
compare_default_test_result_data (telemetry_data_responses.data.front (), *node_server);
compare_default_test_result_data (telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.data, *node_server);
// Now try single request metric
auto telemetry_data_response = node_client->telemetry.get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ()));
ASSERT_FALSE (telemetry_data_response.is_cached);
ASSERT_FALSE (telemetry_data_response.error);
compare_default_test_result_data (telemetry_data_response.data, *node_server);
compare_default_test_result_data (telemetry_data_response.telemetry_data_time_pair.data, *node_server);
ASSERT_EQ (telemetry_data_response.telemetry_data_time_pair.last_updated, telemetry_data_responses.telemetry_data_time_pairs.begin ()->second.last_updated);
done = true;
promise.get_future ().wait ();
@ -710,15 +594,14 @@ TEST (node_telemetry, multiple_single_request_clearing)
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) {
std::chrono::steady_clock::time_point last_updated;
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_FALSE (response_a.is_cached);
last_updated = response_a.telemetry_data_time_pair.last_updated;
done = true;
});
ASSERT_EQ (1, node_client->telemetry.single_requests.size ());
auto last_updated = node_client->telemetry.single_requests.begin ()->second.last_updated;
system.deadline_set (10s);
while (!done)
{
@ -726,11 +609,11 @@ TEST (node_telemetry, multiple_single_request_clearing)
}
done = false;
// Make another request to keep
// Make another request to keep the time updated
system.deadline_set (10s);
node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) {
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_TRUE (response_a.is_cached);
ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated);
done = true;
});
@ -744,9 +627,10 @@ TEST (node_telemetry, multiple_single_request_clearing)
done = false;
auto channel1 = node_client->network.find_channel (node_server1->network.endpoint ());
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) {
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, &last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_FALSE (response_a.is_cached);
ASSERT_NE (last_updated, response_a.telemetry_data_time_pair.last_updated);
last_updated = response_a.telemetry_data_time_pair.last_updated;
done = true;
});
@ -758,9 +642,9 @@ TEST (node_telemetry, multiple_single_request_clearing)
}
done = false;
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) {
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, last_updated](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
ASSERT_TRUE (response_a.is_cached);
ASSERT_EQ (last_updated, response_a.telemetry_data_time_pair.last_updated);
done = true;
});
@ -791,7 +675,7 @@ TEST (node_telemetry, disconnects)
ASSERT_TRUE (channel);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_FALSE (responses_a.all_received);
done = true;
});
@ -815,6 +699,229 @@ TEST (node_telemetry, disconnects)
}
}
TEST (node_telemetry, batch_use_single_request_cache)
{
nano::system system (2);
auto node_client = system.nodes.front ();
auto node_server = system.nodes.back ();
wait_peer_connections (system);
// Request telemetry metrics
nano::telemetry_data_time_pair telemetry_data_time_pair;
{
std::atomic<bool> done{ false };
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data_time_pair](nano::telemetry_data_response const & response_a) {
telemetry_data_time_pair = response_a.telemetry_data_time_pair;
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
}
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data_time_pair](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
ASSERT_EQ (telemetry_data_time_pair, responses_a.telemetry_data_time_pairs.begin ()->second);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
}
// Confirm only 1 request was made
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
// Wait until there is something pending
system.deadline_set (10s);
while (node_client->telemetry.finished_single_requests_size () == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
system.deadline_set (10s);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data_time_pair](nano::telemetry_data_responses const & responses_a) {
ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ());
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (0, node_client->telemetry.finished_single_requests_size ());
ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
}
TEST (node_telemetry, single_request_use_batch_cache)
{
nano::system system (2);
auto node_client = system.nodes.front ();
auto node_server = system.nodes.back ();
wait_peer_connections (system);
// Request batched metric first
std::unordered_map<nano::endpoint, nano::telemetry_data_time_pair> all_telemetry_data_time_pairs;
{
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_data_time_pairs](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
ASSERT_EQ (1, responses_a.telemetry_data_time_pairs.size ());
all_telemetry_data_time_pairs = responses_a.telemetry_data_time_pairs;
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
}
std::atomic<bool> done{ false };
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &all_telemetry_data_time_pairs](nano::telemetry_data_response const & response_a) {
ASSERT_EQ (all_telemetry_data_time_pairs.begin ()->second, response_a.telemetry_data_time_pair);
ASSERT_FALSE (response_a.error);
done = true;
});
system.deadline_set (10s);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
// Confirm only 1 request was made
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
}
TEST (node_telemetry, dos_tcp)
{
// Confirm that telemetry_reqs are not processed
nano::system system (2);
auto node_client = system.nodes.front ();
auto node_server = system.nodes.back ();
wait_peer_connections (system);
nano::telemetry_req message;
auto channel = node_client->network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node_server->network.endpoint ()));
channel->send (message, [](boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
});
system.deadline_set (10s);
while (1 != node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in))
{
ASSERT_NO_ERROR (system.poll ());
}
auto orig = std::chrono::steady_clock::now ();
for (int i = 0; i < 10; ++i)
{
channel->send (message, [](boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
});
}
system.deadline_set (10s);
while ((nano::telemetry_cache_cutoffs::test + orig) > std::chrono::steady_clock::now ())
{
ASSERT_NO_ERROR (system.poll ());
}
// Should process no more telemetry_req messages
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
// Now spam messages waiting for it to be processed
while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1)
{
channel->send (message);
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (node_telemetry, dos_udp)
{
// Confirm that telemetry_reqs are not processed
nano::system system (2);
auto node_client = system.nodes.front ();
auto node_server = system.nodes.back ();
wait_peer_connections (system);
nano::telemetry_req message;
auto channel (node_server->network.udp_channels.create (node_server->network.endpoint ()));
channel->send (message, [](boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
});
system.deadline_set (20s);
while (1 != node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in))
{
ASSERT_NO_ERROR (system.poll ());
}
auto orig = std::chrono::steady_clock::now ();
for (int i = 0; i < 10; ++i)
{
channel->send (message, [](boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
});
}
system.deadline_set (20s);
while ((nano::telemetry_cache_cutoffs::test + orig) > std::chrono::steady_clock::now ())
{
ASSERT_NO_ERROR (system.poll ());
}
// Should process no more telemetry_req messages
ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
// Now spam messages waiting for it to be processed
system.deadline_set (20s);
while (node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in) == 1)
{
channel->send (message);
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (node_telemetry, disable_metrics_single)
{
nano::system system (1);
@ -846,7 +953,7 @@ TEST (node_telemetry, disable_metrics_single)
auto channel1 = node_server->network.find_channel (node_client->network.endpoint ());
node_server->telemetry.get_metrics_single_peer_async (channel1, [&done, node_server](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
compare_default_test_result_data (response_a.data, *node_server);
compare_default_test_result_data (response_a.telemetry_data_time_pair.data, *node_server);
done = true;
});
@ -872,7 +979,7 @@ TEST (node_telemetry, disable_metrics_batch)
ASSERT_TRUE (channel);
std::atomic<bool> done{ false };
node_client->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
ASSERT_FALSE (responses_a.all_received);
done = true;
});
@ -885,9 +992,9 @@ TEST (node_telemetry, disable_metrics_batch)
// It should still be able to receive metrics though
done = false;
node_server->telemetry.get_metrics_random_peers_async ([&done, node_server](nano::telemetry_data_responses const & responses_a) {
node_server->telemetry.get_metrics_peers_async ([&done, node_server](nano::telemetry_data_responses const & responses_a) {
ASSERT_TRUE (responses_a.all_received);
compare_default_test_result_data (responses_a.data.front (), *node_server);
compare_default_test_result_data (responses_a.telemetry_data_time_pairs.begin ()->second.data, *node_server);
done = true;
});
@ -930,6 +1037,6 @@ void compare_default_test_result_data (nano::telemetry_data const & telemetry_da
ASSERT_EQ (*telemetry_data_a.pre_release_version, nano::get_pre_release_node_version ());
ASSERT_EQ (*telemetry_data_a.maker, 0);
ASSERT_LT (telemetry_data_a.uptime, 100);
ASSERT_EQ (telemetry_data_a.genesis_block, nano::genesis ().hash ());
ASSERT_EQ (telemetry_data_a.genesis_block, node_server_a.network_params.ledger.genesis_hash);
}
}

View file

@ -147,8 +147,21 @@ std::unique_ptr<container_info_component> collect_container_info (observer_set<T
void remove_all_files_in_dir (boost::filesystem::path const & dir);
void move_all_files_to_dir (boost::filesystem::path const & from, boost::filesystem::path const & to);
template <class InputIt, class OutputIt, class Pred, class Func>
void transform_if (InputIt first, InputIt last, OutputIt dest, Pred pred, Func transform)
{
while (first != last)
{
if (pred (*first))
{
*dest++ = transform (*first);
}
++first;
}
}
}
// Have our own async_write which we must use?
void release_assert_internal (bool check, const char * check_expr, const char * file, unsigned int line);
#define release_assert(check) release_assert_internal (check, #check, __FILE__, __LINE__)

View file

@ -239,10 +239,16 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co
}
case nano::message_type::telemetry_req:
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::in);
if (is_realtime_connection ())
{
add_request (std::make_unique<nano::telemetry_req> (header));
// Only handle telemetry requests if they are outside of the cutoff time
auto is_very_first_message = last_telemetry_req == std::chrono::steady_clock::time_point{};
auto cache_exceeded = std::chrono::steady_clock::now () >= last_telemetry_req + nano::telemetry_cache_cutoffs::network_to_time (node->network_params.network);
if (is_very_first_message || cache_exceeded)
{
last_telemetry_req = std::chrono::steady_clock::now ();
add_request (std::make_unique<nano::telemetry_req> (header));
}
}
receive ();
break;

View file

@ -75,5 +75,6 @@ public:
// Remote enpoint used to remove response channel even after socket closing
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{ 0 };
std::chrono::steady_clock::time_point last_telemetry_req{ std::chrono::steady_clock::time_point () };
};
}

View file

@ -6,14 +6,19 @@
#include <nano/node/wallet.hpp>
#include <nano/secure/buffer.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/endian/conversion.hpp>
#include <boost/pool/pool_alloc.hpp>
#include <boost/variant/get.hpp>
#include <numeric>
std::bitset<16> constexpr nano::message_header::block_type_mask;
std::bitset<16> constexpr nano::message_header::count_mask;
std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::test;
std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::beta;
std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::live;
namespace
{
nano::protocol_constants const & get_protocol_constants ()
@ -1176,135 +1181,6 @@ bool nano::telemetry_ack::is_empty_payload () const
return size () == 0;
}
nano::telemetry_data nano::telemetry_data::consolidate (std::vector<nano::telemetry_data> const & telemetry_data_responses_a)
{
if (telemetry_data_responses_a.empty ())
{
return {};
}
else if (telemetry_data_responses_a.size () == 1)
{
// Only 1 element in the collection, so just return it.
return telemetry_data_responses_a.front ();
}
nano::uint128_t account_sum{ 0 };
nano::uint128_t block_sum{ 0 };
nano::uint128_t cemented_sum{ 0 };
nano::uint128_t peer_sum{ 0 };
nano::uint128_t unchecked_sum{ 0 };
nano::uint128_t uptime_sum{ 0 };
nano::uint128_t bandwidth_sum{ 0 };
std::unordered_map<uint8_t, int> protocol_versions;
std::unordered_map<std::string, int> vendor_versions;
std::unordered_map<uint64_t, int> bandwidth_caps;
std::unordered_map<nano::block_hash, int> genesis_blocks;
nano::uint128_t account_average{ 0 };
for (auto const & telemetry_data : telemetry_data_responses_a)
{
account_sum += telemetry_data.account_count;
block_sum += telemetry_data.block_count;
cemented_sum += telemetry_data.cemented_count;
std::ostringstream ss;
ss << telemetry_data.major_version;
if (telemetry_data.minor_version.is_initialized ())
{
ss << "." << *telemetry_data.minor_version;
if (telemetry_data.patch_version.is_initialized ())
{
ss << "." << *telemetry_data.patch_version;
if (telemetry_data.pre_release_version.is_initialized ())
{
ss << "." << *telemetry_data.pre_release_version;
if (telemetry_data.maker.is_initialized ())
{
ss << "." << *telemetry_data.maker;
}
}
}
}
++vendor_versions[ss.str ()];
++protocol_versions[telemetry_data.protocol_version];
peer_sum += telemetry_data.peer_count;
// 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed
if (telemetry_data.bandwidth_cap != 0)
{
bandwidth_sum += telemetry_data.bandwidth_cap;
}
++bandwidth_caps[telemetry_data.bandwidth_cap];
unchecked_sum += telemetry_data.unchecked_count;
uptime_sum += telemetry_data.uptime;
++genesis_blocks[telemetry_data.genesis_block];
}
nano::telemetry_data consolidated_data;
auto size = telemetry_data_responses_a.size ();
consolidated_data.account_count = boost::numeric_cast<decltype (account_count)> (account_sum / size);
consolidated_data.block_count = boost::numeric_cast<decltype (block_count)> (block_sum / size);
consolidated_data.cemented_count = boost::numeric_cast<decltype (cemented_count)> (cemented_sum / size);
consolidated_data.peer_count = boost::numeric_cast<decltype (peer_count)> (peer_sum / size);
consolidated_data.uptime = boost::numeric_cast<decltype (uptime)> (uptime_sum / size);
consolidated_data.unchecked_count = boost::numeric_cast<decltype (unchecked_count)> (unchecked_sum / size);
auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
var = (sum / size).template convert_to<std::remove_reference_t<decltype (var)>> ();
}
};
auto set_mode = [](auto const & collection, auto & var, size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
// Just pick the first one
var = collection.begin ()->first;
}
};
// Use the mode of protocol version, vendor version and bandwidth cap if there is 2 or more using it
set_mode_or_average (bandwidth_caps, consolidated_data.bandwidth_cap, bandwidth_sum, size);
set_mode (protocol_versions, consolidated_data.protocol_version, size);
set_mode (genesis_blocks, consolidated_data.genesis_block, size);
// Vendor version, needs to be parsed out of the string
std::string version;
set_mode (vendor_versions, version, size);
// May only have major version, but check for optional parameters as well, only output if all are used
std::vector<std::string> version_fragments;
boost::split (version_fragments, version, boost::is_any_of ("."));
assert (!version_fragments.empty () && version_fragments.size () <= 5);
consolidated_data.major_version = boost::lexical_cast<uint8_t> (version_fragments.front ());
if (version_fragments.size () == 5)
{
consolidated_data.minor_version = boost::lexical_cast<uint8_t> (version_fragments[1]);
consolidated_data.patch_version = boost::lexical_cast<uint8_t> (version_fragments[2]);
consolidated_data.pre_release_version = boost::lexical_cast<uint8_t> (version_fragments[3]);
consolidated_data.maker = boost::lexical_cast<uint8_t> (version_fragments[4]);
}
return consolidated_data;
}
nano::error nano::telemetry_data::serialize_json (nano::jsonconfig & json) const
{
json.put ("block_count", block_count);
@ -1369,6 +1245,11 @@ bool nano::telemetry_data::operator== (nano::telemetry_data const & data_a) cons
return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version == data_a.protocol_version && genesis_block == data_a.genesis_block && major_version == data_a.major_version && minor_version == data_a.minor_version && patch_version == data_a.patch_version && pre_release_version == data_a.pre_release_version && maker == data_a.maker);
}
bool nano::telemetry_data::operator!= (nano::telemetry_data const & data_a) const
{
return !(*this == data_a);
}
nano::node_id_handshake::node_id_handshake (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a) :
message (header_a),
query (boost::none),
@ -1484,6 +1365,22 @@ bool nano::parse_port (std::string const & string_a, uint16_t & port_a)
return result;
}
// Can handle both ipv4 & ipv6 addresses (with and without square brackets)
bool nano::parse_address (std::string const & address_text_a, boost::asio::ip::address & address_a)
{
auto result (false);
auto address_text = address_text_a;
if (!address_text.empty () && address_text.front () == '[' && address_text.back () == ']')
{
// Chop the square brackets off as make_address doesn't always like them
address_text = address_text.substr (1, address_text.size () - 2);
}
boost::system::error_code address_ec;
address_a = boost::asio::ip::make_address (address_text, address_ec);
return !!address_ec;
}
bool nano::parse_address_port (std::string const & string, boost::asio::ip::address & address_a, uint16_t & port_a)
{
auto result (false);
@ -1550,6 +1447,11 @@ bool nano::parse_tcp_endpoint (std::string const & string, nano::tcp_endpoint &
return result;
}
std::chrono::seconds nano::telemetry_cache_cutoffs::network_to_time (network_constants const & network_constants)
{
return std::chrono::seconds{ network_constants.is_live_network () ? live : network_constants.is_beta_network () ? beta : test };
}
nano::node_singleton_memory_pool_purge_guard::node_singleton_memory_pool_purge_guard () :
cleanup_guard ({ nano::block_memory_pool_purge, nano::purge_singleton_pool_memory<nano::vote>, nano::purge_singleton_pool_memory<nano::election> })
{

View file

@ -14,6 +14,7 @@ namespace nano
{
using endpoint = boost::asio::ip::udp::endpoint;
bool parse_port (std::string const &, uint16_t &);
bool parse_address (std::string const &, boost::asio::ip::address &);
bool parse_address_port (std::string const &, boost::asio::ip::address &, uint16_t &);
using tcp_endpoint = boost::asio::ip::tcp::endpoint;
bool parse_endpoint (std::string const &, nano::endpoint &);
@ -348,10 +349,10 @@ public:
boost::optional<uint8_t> pre_release_version;
boost::optional<uint8_t> maker; // 0 for NF node
static nano::telemetry_data consolidate (std::vector<nano::telemetry_data> const & telemetry_data_responses);
nano::error serialize_json (nano::jsonconfig & json) const;
nano::error deserialize_json (nano::jsonconfig & json);
bool operator== (nano::telemetry_data const &) const;
bool operator!= (nano::telemetry_data const &) const;
static auto constexpr size_v0 = sizeof (block_count) + sizeof (cemented_count) + sizeof (unchecked_count) + sizeof (account_count) + sizeof (bandwidth_cap) + sizeof (peer_count) + sizeof (protocol_version) + sizeof (uptime) + sizeof (genesis_block) + sizeof (major_version);
static auto constexpr size = size_v0 + sizeof (decltype (minor_version)::value_type) + sizeof (decltype (patch_version)::value_type) + sizeof (decltype (pre_release_version)::value_type) + sizeof (decltype (maker)::value_type);
@ -451,6 +452,16 @@ public:
virtual ~message_visitor ();
};
class telemetry_cache_cutoffs
{
public:
static std::chrono::seconds constexpr test{ 2 };
static std::chrono::seconds constexpr beta{ 15 };
static std::chrono::seconds constexpr live{ 60 };
static std::chrono::seconds network_to_time (network_constants const & network_constants);
};
/** Helper guard which contains all the necessary purge (remove all memory even if used) functions */
class node_singleton_memory_pool_purge_guard
{

View file

@ -3910,12 +3910,11 @@ void nano::json_handler::telemetry ()
uint16_t port;
if (!nano::parse_port (*port_text, port))
{
boost::system::error_code address_ec;
auto address (boost::asio::ip::make_address_v6 (*address_text, address_ec));
if (!address_ec)
boost::asio::ip::address address;
if (!nano::parse_address (*address_text, address))
{
nano::endpoint endpoint (address, port);
channel = node.network.find_channel (endpoint);
channel = node.network.find_channel (nano::transport::map_endpoint_to_v6 (endpoint));
if (!channel)
{
ec = nano::error_rpc::peer_not_found;
@ -3943,13 +3942,13 @@ void nano::json_handler::telemetry ()
if (!single_telemetry_metric_a.error)
{
nano::jsonconfig config_l;
auto err = single_telemetry_metric_a.data.serialize_json (config_l);
auto err = single_telemetry_metric_a.telemetry_data_time_pair.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (single_telemetry_metric_a.telemetry_data_time_pair.system_last_updated.time_since_epoch ()).count ());
auto const & ptree = config_l.get_tree ();
if (!err)
{
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
rpc_l->response_l.put ("cached", single_telemetry_metric_a.is_cached);
}
else
{
@ -3975,14 +3974,17 @@ void nano::json_handler::telemetry ()
// setting "raw" to true returns metrics from all nodes requested.
auto raw = request.get_optional<bool> ("raw");
auto output_raw = raw.value_or (false);
node.telemetry.get_metrics_random_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) {
node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) {
if (output_raw)
{
boost::property_tree::ptree metrics;
for (auto & telemetry_metrics : batched_telemetry_metrics_a.data)
for (auto & telemetry_metrics : batched_telemetry_metrics_a.telemetry_data_time_pairs)
{
nano::jsonconfig config_l;
auto err = telemetry_metrics.serialize_json (config_l);
auto err = telemetry_metrics.second.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (telemetry_metrics.second.system_last_updated.time_since_epoch ()).count ());
config_l.put ("address", telemetry_metrics.first.address ());
config_l.put ("port", telemetry_metrics.first.port ());
if (!err)
{
metrics.push_back (std::make_pair ("", config_l.get_tree ()));
@ -3998,8 +4000,15 @@ void nano::json_handler::telemetry ()
else
{
nano::jsonconfig config_l;
auto average_telemetry_metrics = nano::telemetry_data::consolidate (batched_telemetry_metrics_a.data);
auto err = average_telemetry_metrics.serialize_json (config_l);
std::vector<nano::telemetry_data_time_pair> telemetry_data_time_pairs;
telemetry_data_time_pairs.reserve (batched_telemetry_metrics_a.telemetry_data_time_pairs.size ());
std::transform (batched_telemetry_metrics_a.telemetry_data_time_pairs.begin (), batched_telemetry_metrics_a.telemetry_data_time_pairs.end (), std::back_inserter (telemetry_data_time_pairs), [](auto const & telemetry_data_time_pair_a) {
return telemetry_data_time_pair_a.second;
});
auto average_telemetry_metrics = nano::consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs);
auto err = average_telemetry_metrics.data.serialize_json (config_l);
config_l.put ("timestamp", std::chrono::duration_cast<std::chrono::seconds> (average_telemetry_metrics.system_last_updated.time_since_epoch ()).count ());
auto const & ptree = config_l.get_tree ();
if (!err)
@ -4012,7 +4021,6 @@ void nano::json_handler::telemetry ()
}
}
rpc_l->response_l.put ("cached", batched_telemetry_metrics_a.is_cached);
rpc_l->response_errors ();
});
}

View file

@ -476,7 +476,7 @@ public:
telemetry_ack = nano::telemetry_ack (telemetry_data);
}
channel->send (telemetry_ack);
channel->send (telemetry_ack, nullptr, false);
}
void telemetry_ack (nano::telemetry_ack const & message_a) override
{
@ -551,11 +551,11 @@ bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_loca
return error;
}
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (size_t count_a)
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (size_t count_a, uint8_t minimum_version_a, bool include_tcp_temporary_channels_a)
{
std::deque<std::shared_ptr<nano::transport::channel>> result;
tcp_channels.list (result);
udp_channels.list (result);
tcp_channels.list (result, minimum_version_a, include_tcp_temporary_channels_a);
udp_channels.list (result, minimum_version_a);
nano::random_pool_shuffle (result.begin (), result.end ());
if (result.size () > count_a)
{

View file

@ -130,11 +130,12 @@ public:
bool not_a_peer (nano::endpoint const &, bool);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
std::deque<std::shared_ptr<nano::transport::channel>> list (size_t);
std::deque<std::shared_ptr<nano::transport::channel>> list (size_t, uint8_t = 0, bool = true);
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (size_t);
// Desired fanout for a given scale
size_t fanout (float scale = 1.0f) const;
void random_fill (std::array<nano::endpoint, 8> &) const;
// Note: The minimum protocol version is used after the random selection, so number of peers can be less than expected.
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0, bool = false) const;
// Get the next peer for attempting a tcp bootstrap connection
nano::tcp_endpoint bootstrap_peer (bool = false);

View file

@ -5,13 +5,16 @@
#include <nano/node/transport/transport.hpp>
#include <nano/secure/buffer.hpp>
#include <boost/algorithm/string.hpp>
#include <algorithm>
#include <cassert>
#include <cstdint>
#include <future>
#include <numeric>
#include <set>
std::chrono::milliseconds constexpr nano::telemetry_impl::cache_cutoff;
std::chrono::seconds constexpr nano::telemetry_impl::alarm_cutoff;
nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) :
network (network_a),
@ -19,6 +22,44 @@ alarm (alarm_a),
worker (worker_a),
batch_request (std::make_shared<nano::telemetry_impl> (network, alarm, worker))
{
// Before callbacks are called with the batch request, check if any of the single request data can be appended to give
batch_request->pre_callback_callback = [this](std::unordered_map<nano::endpoint, telemetry_data_time_pair> & data_a, std::mutex & mutex_a) {
nano::lock_guard<std::mutex> guard (this->mutex);
for (auto & single_request : single_requests)
{
nano::lock_guard<std::mutex> guard (single_request.second.impl->mutex);
if (!single_request.second.impl->cached_telemetry_data.empty ())
{
nano::lock_guard<std::mutex> batch_request_guard (mutex_a);
auto it = this->batch_request->cached_telemetry_data.find (single_request.first);
if (it != this->batch_request->cached_telemetry_data.cend () && single_request.second.last_updated > it->second.last_updated)
{
it->second = single_request.second.impl->cached_telemetry_data.begin ()->second;
}
else
{
data_a.emplace (single_request.first, single_request.second.impl->cached_telemetry_data.begin ()->second);
}
}
}
for (auto & pending : finished_single_requests)
{
nano::lock_guard<std::mutex> batch_request_guard (mutex_a);
auto it = this->batch_request->cached_telemetry_data.find (pending.first);
if (it != this->batch_request->cached_telemetry_data.cend () && pending.second.last_updated > it->second.last_updated)
{
it->second = pending.second;
}
else
{
data_a.emplace (pending.first, pending.second);
}
}
finished_single_requests.clear ();
};
ongoing_req_all_peers ();
}
void nano::telemetry::stop ()
@ -43,28 +84,61 @@ void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano::
}
}
void nano::telemetry::get_metrics_random_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a)
void nano::telemetry::ongoing_req_all_peers ()
{
// These peers will only be used if there isn't an already ongoing batch telemetry request round
auto random_peers = network.random_set (network.size_sqrt (), network_params.protocol.telemetry_protocol_version_min, true);
alarm.add (std::chrono::steady_clock::now () + batch_request->cache_cutoff + batch_request->alarm_cutoff, [this, telemetry_impl_w = std::weak_ptr<nano::telemetry_impl> (batch_request)]() {
if (auto batch_telemetry_impl = telemetry_impl_w.lock ())
{
nano::lock_guard<std::mutex> guard (this->mutex);
if (!this->stopped)
{
auto peers = this->network.list (std::numeric_limits<size_t>::max (), network_params.protocol.telemetry_protocol_version_min, false);
// If exists in single_requests don't request because they will just be rejected by other peers until the next round
auto const & single_requests = this->single_requests;
peers.erase (std::remove_if (peers.begin (), peers.end (), [&single_requests](auto const & channel_a) {
return single_requests.count (channel_a->get_endpoint ()) > 0;
}),
peers.cend ());
if (!peers.empty ())
{
batch_telemetry_impl->get_metrics_async (peers, [](nano::telemetry_data_responses const &) {
// Intentionally empty, just using to refresh the cache
});
}
this->ongoing_req_all_peers ();
}
}
});
}
void nano::telemetry::get_metrics_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a)
{
auto peers = network.list (std::numeric_limits<size_t>::max (), network_params.protocol.telemetry_protocol_version_min, false);
nano::lock_guard<std::mutex> guard (mutex);
if (!stopped && !random_peers.empty ())
if (!stopped && !peers.empty ())
{
batch_request->get_metrics_async (random_peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) {
// If exists in single_requests, don't request because they will just be rejected by other nodes, instead all it as additional values
peers.erase (std::remove_if (peers.begin (), peers.end (), [& single_requests = this->single_requests](auto const & channel_a) {
return single_requests.count (channel_a->get_endpoint ()) > 0;
}),
peers.cend ());
batch_request->get_metrics_async (peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) {
callback_a (telemetry_data_responses);
});
}
else
{
const auto all_received = false;
callback_a (nano::telemetry_data_responses{ {}, false, all_received });
callback_a (nano::telemetry_data_responses{ {}, all_received });
}
}
nano::telemetry_data_responses nano::telemetry::get_metrics_random_peers ()
nano::telemetry_data_responses nano::telemetry::get_metrics_peers ()
{
std::promise<telemetry_data_responses> promise;
get_metrics_random_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) {
get_metrics_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) {
promise.set_value (telemetry_data_responses_a);
});
@ -74,38 +148,27 @@ nano::telemetry_data_responses nano::telemetry::get_metrics_random_peers ()
// After a request is made to a single peer we want to remove it from the container after the peer has not been requested for a while (cache_cutoff).
void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a)
{
// This class is just
class ongoing_func_wrapper
{
public:
std::function<void()> ongoing_func;
};
auto wrapper = std::make_shared<ongoing_func_wrapper> ();
// Keep calling ongoing_func while the peer is still being called
const auto & last_updated = single_request_data_a.last_updated;
wrapper->ongoing_func = [this, telemetry_impl_w = std::weak_ptr<nano::telemetry_impl> (single_request_data_a.impl), &last_updated, &endpoint_a, wrapper]() {
alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, [this, telemetry_impl_w = std::weak_ptr<nano::telemetry_impl> (single_request_data_a.impl), &single_request_data_a, &endpoint_a]() {
if (auto telemetry_impl = telemetry_impl_w.lock ())
{
nano::lock_guard<std::mutex> guard (this->mutex);
if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > last_updated && telemetry_impl->callbacks.empty ())
if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > single_request_data_a.last_updated && telemetry_impl->callbacks.empty ())
{
// This will be picked up by the batch request next round
this->finished_single_requests[endpoint_a] = telemetry_impl->cached_telemetry_data.begin ()->second;
this->single_requests.erase (endpoint_a);
}
else
{
// Request is still active, so call again
this->alarm.add (std::chrono::steady_clock::now () + telemetry_impl->cache_cutoff, wrapper->ongoing_func);
this->ongoing_single_request_cleanup (endpoint_a, single_request_data_a);
}
}
};
alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, wrapper->ongoing_func);
});
}
void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a)
{
auto telemetry_impl = single_request_data_a.impl;
if (is_new_a)
{
// Clean this request up when it isn't being used anymore
@ -120,10 +183,16 @@ void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, na
void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::transport::channel> const & channel_a, std::function<void(telemetry_data_response const &)> const & callback_a)
{
auto invoke_callback_with_error = [&callback_a]() {
auto const is_cached = false;
auto const error = true;
callback_a ({ nano::telemetry_data (), is_cached, error });
auto invoke_callback_with_error = [&callback_a, &worker = this->worker, channel_a]() {
nano::endpoint endpoint;
if (channel_a)
{
endpoint = channel_a->get_endpoint ();
}
worker.push_task ([callback_a, endpoint]() {
auto const error = true;
callback_a ({ nano::telemetry_data_time_pair{}, endpoint, error });
});
};
nano::lock_guard<std::mutex> guard (mutex);
@ -131,20 +200,47 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
{
if (channel_a && (channel_a->get_network_version () >= network_params.protocol.telemetry_protocol_version_min))
{
auto pair = single_requests.emplace (channel_a->get_endpoint (), single_request_data{ std::make_shared<nano::telemetry_impl> (network, alarm, worker), std::chrono::steady_clock::now () });
update_cleanup_data (pair.first->first, pair.first->second, pair.second);
auto add_callback_async = [& worker = this->worker, &callback_a](telemetry_data_time_pair const & telemetry_data_time_pair_a, nano::endpoint const & endpoint_a) {
telemetry_data_response telemetry_data_response_l{ telemetry_data_time_pair_a, endpoint_a, false };
worker.push_task ([telemetry_data_response_l, callback_a]() {
callback_a (telemetry_data_response_l);
});
};
pair.first->second.impl->get_metrics_async ({ channel_a }, [callback_a](telemetry_data_responses const & telemetry_data_responses_a) {
// First check if the batched metrics have processed this endpoint.
{
nano::lock_guard<std::mutex> guard (batch_request->mutex);
auto it = batch_request->cached_telemetry_data.find (channel_a->get_endpoint ());
if (it != batch_request->cached_telemetry_data.cend ())
{
add_callback_async (it->second, it->first);
return;
}
}
// Next check single requests which finished and are awaiting batched requests
auto it = finished_single_requests.find (channel_a->get_endpoint ());
if (it != finished_single_requests.cend ())
{
add_callback_async (it->second, it->first);
return;
}
auto pair = single_requests.emplace (channel_a->get_endpoint (), single_request_data{ std::make_shared<nano::telemetry_impl> (network, alarm, worker), std::chrono::steady_clock::now () });
auto & single_request_data_it = pair.first;
update_cleanup_data (single_request_data_it->first, single_request_data_it->second, pair.second);
single_request_data_it->second.impl->get_metrics_async ({ channel_a }, [callback_a, channel_a](telemetry_data_responses const & telemetry_data_responses_a) {
// There should only be 1 response, so if this hasn't been received then conclude it is an error.
auto const error = !telemetry_data_responses_a.all_received;
if (!error)
{
assert (telemetry_data_responses_a.data.size () == 1);
callback_a ({ telemetry_data_responses_a.data.front (), telemetry_data_responses_a.is_cached, error });
assert (telemetry_data_responses_a.telemetry_data_time_pairs.size () == 1);
auto it = telemetry_data_responses_a.telemetry_data_time_pairs.begin ();
callback_a ({ it->second, it->first, error });
}
else
{
callback_a ({ nano::telemetry_data (), telemetry_data_responses_a.is_cached, error });
callback_a ({ nano::telemetry_data_time_pair{}, channel_a->get_endpoint (), error });
}
});
}
@ -183,6 +279,12 @@ size_t nano::telemetry::telemetry_data_size ()
return total;
}
size_t nano::telemetry::finished_single_requests_size ()
{
nano::lock_guard<std::mutex> guard (mutex);
return finished_single_requests.size ();
}
nano::telemetry_impl::telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) :
network (network_a),
alarm (alarm_a),
@ -190,25 +292,30 @@ worker (worker_a)
{
}
void nano::telemetry_impl::flush_callbacks (nano::unique_lock<std::mutex> & lk_a, bool cached_a)
void nano::telemetry_impl::flush_callbacks_async ()
{
// Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added,
// so check again and invoke those too
assert (lk_a.owns_lock ());
invoking = true;
while (!callbacks.empty ())
{
lk_a.unlock ();
invoke_callbacks (cached_a);
lk_a.lock ();
}
invoking = false;
// Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise)
worker.push_task ([this_w = std::weak_ptr<nano::telemetry_impl> (shared_from_this ())]() {
if (auto this_l = this_w.lock ())
{
nano::unique_lock<std::mutex> lk (this_l->mutex);
// Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added,
// so check again and invoke those too
this_l->invoking = true;
while (!this_l->callbacks.empty ())
{
lk.unlock ();
this_l->invoke_callbacks ();
lk.lock ();
}
this_l->invoking = false;
}
});
}
void nano::telemetry_impl::get_metrics_async (std::unordered_set<std::shared_ptr<nano::transport::channel>> const & channels_a, std::function<void(telemetry_data_responses const &)> const & callback_a)
void nano::telemetry_impl::get_metrics_async (std::deque<std::shared_ptr<nano::transport::channel>> const & channels_a, std::function<void(telemetry_data_responses const &)> const & callback_a)
{
{
assert (!channels_a.empty ());
nano::unique_lock<std::mutex> lk (mutex);
callbacks.push_back (callback_a);
if (callbacks.size () > 1 || invoking)
@ -218,21 +325,13 @@ void nano::telemetry_impl::get_metrics_async (std::unordered_set<std::shared_ptr
}
// Check if we can just return cached results
if (std::chrono::steady_clock::now () < (last_time + cache_cutoff))
if (channels_a.empty () || std::chrono::steady_clock::now () <= (last_time + cache_cutoff))
{
// Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise)
worker.push_task ([this_w = std::weak_ptr<nano::telemetry_impl> (shared_from_this ())]() {
if (auto this_l = this_w.lock ())
{
nano::unique_lock<std::mutex> lk (this_l->mutex);
const auto is_cached = true;
this_l->flush_callbacks (lk, is_cached);
}
});
flush_callbacks_async ();
return;
}
all_received = true;
failed.clear ();
assert (required_responses.empty ());
std::transform (channels_a.begin (), channels_a.end (), std::inserter (required_responses, required_responses.end ()), [](auto const & channel) {
return channel->get_endpoint ();
@ -253,14 +352,15 @@ void nano::telemetry_impl::add (nano::telemetry_data const & telemetry_data_a, n
if (!is_empty_a)
{
current_telemetry_data_responses.push_back (telemetry_data_a);
current_telemetry_data_responses[endpoint_a] = { telemetry_data_a, std::chrono::steady_clock::now (), std::chrono::system_clock::now () };
}
channel_processed (lk, endpoint_a);
}
void nano::telemetry_impl::invoke_callbacks (bool cached_a)
void nano::telemetry_impl::invoke_callbacks ()
{
decltype (callbacks) callbacks_l;
bool all_received;
decltype (cached_telemetry_data) cached_telemetry_data_l;
{
// Copy callbacks so that they can be called outside of holding the lock
@ -269,12 +369,18 @@ void nano::telemetry_impl::invoke_callbacks (bool cached_a)
cached_telemetry_data_l = cached_telemetry_data;
current_telemetry_data_responses.clear ();
callbacks.clear ();
all_received = failed.empty ();
}
if (pre_callback_callback)
{
pre_callback_callback (cached_telemetry_data_l, mutex);
}
// Need to account for nodes which disable telemetry data in responses
bool all_received_l = !cached_telemetry_data_l.empty () && all_received;
for (auto & callback : callbacks_l)
{
callback ({ cached_telemetry_data_l, cached_a, all_received_l });
callback ({ cached_telemetry_data_l, all_received_l });
}
}
@ -288,12 +394,11 @@ void nano::telemetry_impl::channel_processed (nano::unique_lock<std::mutex> & lk
cached_telemetry_data = current_telemetry_data_responses;
last_time = std::chrono::steady_clock::now ();
auto const is_cached = false;
flush_callbacks (lk_a, is_cached);
flush_callbacks_async ();
}
}
void nano::telemetry_impl::fire_request_messages (std::unordered_set<std::shared_ptr<nano::transport::channel>> const & channels)
void nano::telemetry_impl::fire_request_messages (std::deque<std::shared_ptr<nano::transport::channel>> const & channels)
{
uint64_t round_l;
{
@ -309,6 +414,7 @@ void nano::telemetry_impl::fire_request_messages (std::unordered_set<std::shared
assert (channel->get_network_version () >= network_params.protocol.telemetry_protocol_version_min);
std::weak_ptr<nano::telemetry_impl> this_w (shared_from_this ());
// clang-format off
channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) {
if (auto this_l = this_w.lock ())
{
@ -316,20 +422,22 @@ void nano::telemetry_impl::fire_request_messages (std::unordered_set<std::shared
{
// Error sending the telemetry_req message
nano::unique_lock<std::mutex> lk (this_l->mutex);
this_l->all_received = false;
this_l->failed.push_back (endpoint);
this_l->channel_processed (lk, endpoint);
}
}
});
},
false);
// clang-format on
// If no response is seen after a certain period of time, remove it from the list of expected responses. However, only if it is part of the same round.
alarm.add (std::chrono::steady_clock::now () + cache_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() {
alarm.add (std::chrono::steady_clock::now () + alarm_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() {
if (auto this_l = this_w.lock ())
{
nano::unique_lock<std::mutex> lk (this_l->mutex);
if (this_l->round == round_l && this_l->required_responses.find (endpoint) != this_l->required_responses.cend ())
{
this_l->all_received = false;
this_l->failed.push_back (endpoint);
this_l->channel_processed (lk, endpoint);
}
}
@ -343,6 +451,16 @@ size_t nano::telemetry_impl::telemetry_data_size ()
return current_telemetry_data_responses.size ();
}
bool nano::telemetry_data_time_pair::operator== (telemetry_data_time_pair const & telemetry_data_time_pair_a) const
{
return data == telemetry_data_time_pair_a.data && last_updated == telemetry_data_time_pair_a.last_updated;
}
bool nano::telemetry_data_time_pair::operator!= (telemetry_data_time_pair const & telemetry_data_time_pair_a) const
{
return !(*this == telemetry_data_time_pair_a);
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (telemetry & telemetry, const std::string & name)
{
size_t single_requests_count;
@ -357,6 +475,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (te
composite->add_component (collect_container_info (*telemetry.batch_request, "batch_request"));
}
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "single_requests", single_requests_count, sizeof (decltype (telemetry.single_requests)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "finished_single_requests", telemetry.finished_single_requests_size (), sizeof (decltype (telemetry.finished_single_requests)::value_type) }));
return composite;
}
@ -381,3 +500,172 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (te
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "required_responses", required_responses_count, sizeof (decltype (telemetry_impl.required_responses)::value_type) }));
return composite;
}
nano::telemetry_data nano::consolidate_telemetry_data (std::vector<nano::telemetry_data> const & telemetry_datas)
{
std::vector<nano::telemetry_data_time_pair> telemetry_data_time_pairs;
telemetry_data_time_pairs.reserve (telemetry_datas.size ());
std::transform (telemetry_datas.begin (), telemetry_datas.end (), std::back_inserter (telemetry_data_time_pairs), [](nano::telemetry_data const & telemetry_data_a) {
// Don't care about the timestamps here
return nano::telemetry_data_time_pair{ telemetry_data_a, {}, {} };
});
return consolidate_telemetry_data_time_pairs (telemetry_data_time_pairs).data;
}
nano::telemetry_data_time_pair nano::consolidate_telemetry_data_time_pairs (std::vector<nano::telemetry_data_time_pair> const & telemetry_data_time_pairs_a)
{
if (telemetry_data_time_pairs_a.empty ())
{
return {};
}
else if (telemetry_data_time_pairs_a.size () == 1)
{
// Only 1 element in the collection, so just return it.
return telemetry_data_time_pairs_a.front ();
}
std::unordered_map<uint8_t, int> protocol_versions;
std::unordered_map<std::string, int> vendor_versions;
std::unordered_map<uint64_t, int> bandwidth_caps;
std::unordered_map<nano::block_hash, int> genesis_blocks;
// Use a trimmed average which excludes the upper and lower 10% of the results
std::multiset<uint64_t> account_counts;
std::multiset<uint64_t> block_counts;
std::multiset<uint64_t> cemented_counts;
std::multiset<uint32_t> peer_counts;
std::multiset<uint64_t> unchecked_counts;
std::multiset<uint64_t> uptime_counts;
std::multiset<uint64_t> bandwidth_counts;
std::multiset<long long> timestamp_counts;
for (auto const & telemetry_data_time_pair : telemetry_data_time_pairs_a)
{
auto & telemetry_data = telemetry_data_time_pair.data;
account_counts.insert (telemetry_data.account_count);
block_counts.insert (telemetry_data.block_count);
cemented_counts.insert (telemetry_data.cemented_count);
std::ostringstream ss;
ss << telemetry_data.major_version;
if (telemetry_data.minor_version.is_initialized ())
{
ss << "." << *telemetry_data.minor_version;
if (telemetry_data.patch_version.is_initialized ())
{
ss << "." << *telemetry_data.patch_version;
if (telemetry_data.pre_release_version.is_initialized ())
{
ss << "." << *telemetry_data.pre_release_version;
if (telemetry_data.maker.is_initialized ())
{
ss << "." << *telemetry_data.maker;
}
}
}
}
++vendor_versions[ss.str ()];
++protocol_versions[telemetry_data.protocol_version];
peer_counts.insert (telemetry_data.peer_count);
unchecked_counts.insert (telemetry_data.unchecked_count);
uptime_counts.insert (telemetry_data.uptime);
// 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed
if (telemetry_data.bandwidth_cap != 0)
{
bandwidth_counts.insert (telemetry_data.bandwidth_cap);
}
++bandwidth_caps[telemetry_data.bandwidth_cap];
++genesis_blocks[telemetry_data.genesis_block];
timestamp_counts.insert (std::chrono::time_point_cast<std::chrono::milliseconds> (telemetry_data_time_pair.system_last_updated).time_since_epoch ().count ());
}
// Remove 10% of the results from the lower and upper bounds to catch any outliers. Need at least 10 responses before any are removed.
auto num_either_side_to_remove = telemetry_data_time_pairs_a.size () / 10;
auto strip_outliers_and_sum = [num_either_side_to_remove](auto & counts) {
counts.erase (counts.begin (), std::next (counts.begin (), num_either_side_to_remove));
counts.erase (std::next (counts.rbegin (), num_either_side_to_remove).base (), counts.end ());
return std::accumulate (counts.begin (), counts.end (), nano::uint128_t (0), [](nano::uint128_t total, auto count) {
return total += count;
});
};
auto account_sum = strip_outliers_and_sum (account_counts);
auto block_sum = strip_outliers_and_sum (block_counts);
auto cemented_sum = strip_outliers_and_sum (cemented_counts);
auto peer_sum = strip_outliers_and_sum (peer_counts);
auto unchecked_sum = strip_outliers_and_sum (unchecked_counts);
auto uptime_sum = strip_outliers_and_sum (uptime_counts);
auto bandwidth_sum = strip_outliers_and_sum (bandwidth_counts);
nano::telemetry_data consolidated_data;
auto size = telemetry_data_time_pairs_a.size () - num_either_side_to_remove * 2;
consolidated_data.account_count = boost::numeric_cast<decltype (consolidated_data.account_count)> (account_sum / size);
consolidated_data.block_count = boost::numeric_cast<decltype (consolidated_data.block_count)> (block_sum / size);
consolidated_data.cemented_count = boost::numeric_cast<decltype (consolidated_data.cemented_count)> (cemented_sum / size);
consolidated_data.peer_count = boost::numeric_cast<decltype (consolidated_data.peer_count)> (peer_sum / size);
consolidated_data.uptime = boost::numeric_cast<decltype (consolidated_data.uptime)> (uptime_sum / size);
consolidated_data.unchecked_count = boost::numeric_cast<decltype (consolidated_data.unchecked_count)> (unchecked_sum / size);
auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
var = (sum / size).template convert_to<std::remove_reference_t<decltype (var)>> ();
}
};
auto set_mode = [](auto const & collection, auto & var, size_t size) {
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
return lhs.second < rhs.second;
});
if (max->second > 1)
{
var = max->first;
}
else
{
// Just pick the first one
var = collection.begin ()->first;
}
};
// Use the mode of protocol version and vendor version. Also use it for bandwidth cap if there is 2 or more of the same cap.
set_mode_or_average (bandwidth_caps, consolidated_data.bandwidth_cap, bandwidth_sum, size);
set_mode (protocol_versions, consolidated_data.protocol_version, size);
set_mode (genesis_blocks, consolidated_data.genesis_block, size);
// Vendor version, needs to be parsed out of the string
std::string version;
set_mode (vendor_versions, version, size);
// May only have major version, but check for optional parameters as well, only output if all are used
std::vector<std::string> version_fragments;
boost::split (version_fragments, version, boost::is_any_of ("."));
assert (!version_fragments.empty () && version_fragments.size () <= 5);
consolidated_data.major_version = boost::lexical_cast<uint8_t> (version_fragments.front ());
if (version_fragments.size () == 5)
{
consolidated_data.minor_version = boost::lexical_cast<uint8_t> (version_fragments[1]);
consolidated_data.patch_version = boost::lexical_cast<uint8_t> (version_fragments[2]);
consolidated_data.pre_release_version = boost::lexical_cast<uint8_t> (version_fragments[3]);
consolidated_data.maker = boost::lexical_cast<uint8_t> (version_fragments[4]);
}
// Consolidate timestamps
auto timestamp_sum = strip_outliers_and_sum (timestamp_counts);
auto consolidated_timestamp = boost::numeric_cast<long long> (timestamp_sum / size);
return telemetry_data_time_pair{ consolidated_data, std::chrono::steady_clock::time_point{}, std::chrono::system_clock::time_point (std::chrono::milliseconds (consolidated_timestamp)) };
}

View file

@ -18,15 +18,25 @@ namespace transport
class channel;
}
class telemetry_data_time_pair
{
public:
nano::telemetry_data data;
std::chrono::steady_clock::time_point last_updated;
std::chrono::system_clock::time_point system_last_updated;
bool operator== (telemetry_data_time_pair const &) const;
bool operator!= (telemetry_data_time_pair const &) const;
};
/*
* Holds a response from a telemetry request
*/
class telemetry_data_response
{
public:
nano::telemetry_data data;
bool is_cached;
bool error;
nano::telemetry_data_time_pair telemetry_data_time_pair;
nano::endpoint endpoint;
bool error{ true };
};
/*
@ -35,9 +45,8 @@ public:
class telemetry_data_responses
{
public:
std::vector<nano::telemetry_data> data;
bool is_cached;
bool all_received;
std::unordered_map<nano::endpoint, telemetry_data_time_pair> telemetry_data_time_pairs;
bool all_received{ false };
};
/*
@ -52,46 +61,58 @@ public:
private:
// Class only available to the telemetry class
void get_metrics_async (std::unordered_set<std::shared_ptr<nano::transport::channel>> const & channels_a, std::function<void(telemetry_data_responses const &)> const & callback_a);
void get_metrics_async (std::deque<std::shared_ptr<nano::transport::channel>> const & channels_a, std::function<void(telemetry_data_responses const &)> const & callback_a);
void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a);
size_t telemetry_data_size ();
nano::network_params network_params;
// Anything older than this requires requesting metrics from other nodes
static std::chrono::milliseconds constexpr cache_cutoff{ 3000 };
// Anything older than this requires requesting metrics from other nodes.
std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) };
static std::chrono::seconds constexpr alarm_cutoff{ 3 };
// All data in this chunk is protected by this mutex
std::mutex mutex;
std::vector<std::function<void(telemetry_data_responses const &)>> callbacks;
std::chrono::steady_clock::time_point last_time = std::chrono::steady_clock::now () - cache_cutoff;
/* The responses received during this request round */
std::vector<nano::telemetry_data> current_telemetry_data_responses;
std::unordered_map<nano::endpoint, telemetry_data_time_pair> current_telemetry_data_responses;
/* The metrics for the last request round */
std::vector<nano::telemetry_data> cached_telemetry_data;
std::unordered_map<nano::endpoint, telemetry_data_time_pair> cached_telemetry_data;
std::unordered_set<nano::endpoint> required_responses;
uint64_t round{ 0 };
/* Currently executing callbacks */
bool invoking{ false };
std::atomic<bool> all_received{ true };
std::vector<nano::endpoint> failed;
nano::network & network;
nano::alarm & alarm;
nano::worker & worker;
void invoke_callbacks (bool cached_a);
std::function<void(std::unordered_map<nano::endpoint, telemetry_data_time_pair> & data_a, std::mutex &)> pre_callback_callback;
void invoke_callbacks ();
void channel_processed (nano::unique_lock<std::mutex> & lk_a, nano::endpoint const & endpoint_a);
void flush_callbacks (nano::unique_lock<std::mutex> & lk_a, bool cached_a);
void fire_request_messages (std::unordered_set<std::shared_ptr<nano::transport::channel>> const & channels);
void flush_callbacks_async ();
void fire_request_messages (std::deque<std::shared_ptr<nano::transport::channel>> const & channels);
friend std::unique_ptr<container_info_component> collect_container_info (telemetry_impl &, const std::string &);
friend nano::telemetry;
friend class node_telemetry_single_request_Test;
friend class node_telemetry_basic_Test;
friend class node_telemetry_ongoing_requests_Test;
};
std::unique_ptr<nano::container_info_component> collect_container_info (telemetry_impl & telemetry_impl, const std::string & name);
/*
* This class has 2 main operations:
* Request metrics from specific single peers (single_requests)
* - If this peer is in the batched request, it will use the value from that, otherwise send a telemetry_req message (non-droppable)
* Request metrics from all peers (batched_request)
* - This is polled every minute.
* - If a single request is currently underway, do not request because other peers will just reject if within a hotzone time
* - This will be proactively added when callbacks are called inside pre_callback_callback
*/
class telemetry
{
public:
@ -105,14 +126,14 @@ public:
void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a);
/*
* Collects metrics from square root number of peers and invokes the callback when complete.
* Collects metrics from all known peers and invokes the callback when complete.
*/
void get_metrics_random_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a);
void get_metrics_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a);
/*
* A blocking version of get_metrics_random_peers_async ().
* A blocking version of get_metrics_peers_async ().
*/
telemetry_data_responses get_metrics_random_peers ();
telemetry_data_responses get_metrics_peers ();
/*
* This makes a telemetry request to the specific channel
@ -129,6 +150,11 @@ public:
*/
size_t telemetry_data_size ();
/*
* Return the number of finished_single_requests elements
*/
size_t finished_single_requests_size ();
/*
* Stop the telemetry processor
*/
@ -153,14 +179,20 @@ private:
std::shared_ptr<telemetry_impl> batch_request;
/* Any requests to specific individual peers is maintained here */
std::unordered_map<nano::endpoint, single_request_data> single_requests;
/* This holds data from single_requests after the cache is removed */
std::unordered_map<nano::endpoint, telemetry_data_time_pair> finished_single_requests;
bool stopped{ false };
void update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a);
void ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a);
void ongoing_req_all_peers ();
friend class node_telemetry_multiple_single_request_clearing_Test;
friend std::unique_ptr<container_info_component> collect_container_info (telemetry &, const std::string &);
};
std::unique_ptr<nano::container_info_component> collect_container_info (telemetry & telemetry, const std::string & name);
nano::telemetry_data consolidate_telemetry_data (std::vector<telemetry_data> const & telemetry_data);
nano::telemetry_data_time_pair consolidate_telemetry_data_time_pairs (std::vector<telemetry_data_time_pair> const & telemetry_data_time_pairs);
}

View file

@ -449,13 +449,14 @@ void nano::transport::tcp_channels::ongoing_keepalive ()
});
}
void nano::transport::tcp_channels::list (std::deque<std::shared_ptr<nano::transport::channel>> & deque_a)
void nano::transport::tcp_channels::list (std::deque<std::shared_ptr<nano::transport::channel>> & deque_a, uint8_t minimum_version_a, bool include_temporary_channels_a)
{
nano::lock_guard<std::mutex> lock (mutex);
for (auto const & channel : channels.get<random_access_tag> ())
{
deque_a.push_back (channel.channel);
}
// clang-format off
nano::transform_if (channels.get<random_access_tag> ().begin (), channels.get<random_access_tag> ().end (), std::back_inserter (deque_a),
[include_temporary_channels_a, minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a && (include_temporary_channels_a || !channel_a.channel->temporary); },
[](const auto & channel) { return channel.channel; });
// clang-format on
}
void nano::transport::tcp_channels::modify (std::shared_ptr<nano::transport::channel_tcp> channel_a, std::function<void(std::shared_ptr<nano::transport::channel_tcp>)> modify_callback_a)

View file

@ -78,6 +78,7 @@ namespace transport
class tcp_channels final
{
friend class nano::transport::channel_tcp;
friend class node_telemetry_simultaneous_single_and_random_requests_Test;
public:
tcp_channels (nano::node &);
@ -102,7 +103,7 @@ namespace transport
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
void purge (std::chrono::steady_clock::time_point const &);
void ongoing_keepalive ();
void list (std::deque<std::shared_ptr<nano::transport::channel>> &);
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0, bool = true);
void modify (std::shared_ptr<nano::transport::channel_tcp>, std::function<void(std::shared_ptr<nano::transport::channel_tcp>)>);
void update (nano::tcp_endpoint const &);
// Connection start

View file

@ -434,7 +434,25 @@ public:
}
void telemetry_req (nano::telemetry_req const & message_a) override
{
message (message_a);
auto find_channel (node.network.udp_channels.channel (endpoint));
if (find_channel)
{
auto is_very_first_message = find_channel->get_last_telemetry_req () == std::chrono::steady_clock::time_point{};
auto cache_exceeded = std::chrono::steady_clock::now () >= find_channel->get_last_telemetry_req () + nano::telemetry_cache_cutoffs::network_to_time (node.network_params.network);
if (is_very_first_message || cache_exceeded)
{
node.network.udp_channels.modify (find_channel, [](std::shared_ptr<nano::transport::channel_udp> channel_a) {
channel_a->set_last_telemetry_req (std::chrono::steady_clock::now ());
});
message (message_a);
}
else
{
node.network.udp_channels.modify (find_channel, [](std::shared_ptr<nano::transport::channel_udp> channel_a) {
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
});
}
}
}
void telemetry_ack (nano::telemetry_ack const & message_a) override
{
@ -688,13 +706,14 @@ void nano::transport::udp_channels::ongoing_keepalive ()
});
}
void nano::transport::udp_channels::list (std::deque<std::shared_ptr<nano::transport::channel>> & deque_a)
void nano::transport::udp_channels::list (std::deque<std::shared_ptr<nano::transport::channel>> & deque_a, uint8_t minimum_version_a)
{
nano::lock_guard<std::mutex> lock (mutex);
for (auto const & channel : channels.get<random_access_tag> ())
{
deque_a.push_back (channel.channel);
}
// clang-format off
nano::transform_if (channels.begin (), channels.end (), std::back_inserter (deque_a),
[minimum_version_a](auto & channel_a) { return channel_a.channel->get_network_version () >= minimum_version_a; },
[](const auto & channel) { return channel.channel; });
// clang-format on
}
void nano::transport::udp_channels::modify (std::shared_ptr<nano::transport::channel_udp> channel_a, std::function<void(std::shared_ptr<nano::transport::channel_udp>)> modify_callback_a)

View file

@ -54,9 +54,22 @@ namespace transport
return nano::transport::transport_type::udp;
}
std::chrono::steady_clock::time_point get_last_telemetry_req ()
{
nano::lock_guard<std::mutex> lk (channel_mutex);
return last_telemetry_req;
}
void set_last_telemetry_req (std::chrono::steady_clock::time_point const time_a)
{
nano::lock_guard<std::mutex> lk (channel_mutex);
last_telemetry_req = time_a;
}
private:
nano::endpoint endpoint;
nano::transport::udp_channels & channels;
std::chrono::steady_clock::time_point last_telemetry_req{ std::chrono::steady_clock::time_point () };
};
class udp_channels final
{
@ -90,7 +103,7 @@ namespace transport
std::unique_ptr<container_info_component> collect_container_info (std::string const &);
void purge (std::chrono::steady_clock::time_point const &);
void ongoing_keepalive ();
void list (std::deque<std::shared_ptr<nano::transport::channel>> &);
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0);
void modify (std::shared_ptr<nano::transport::channel_udp>, std::function<void(std::shared_ptr<nano::transport::channel_udp>)>);
nano::node & node;
@ -134,6 +147,10 @@ namespace transport
{
return channel->get_last_bootstrap_attempt ();
}
std::chrono::steady_clock::time_point last_telemetry_req () const
{
return channel->get_last_telemetry_req ();
}
boost::asio::ip::address ip_address () const
{
return endpoint ().address ();

View file

@ -7825,7 +7825,6 @@ namespace
void compare_default_test_result_data (test_response & response, nano::node const & node_server_a)
{
ASSERT_EQ (200, response.status);
ASSERT_FALSE (response.json.get<bool> ("cached"));
ASSERT_EQ (1, response.json.get<uint64_t> ("block_count"));
ASSERT_EQ (1, response.json.get<uint64_t> ("cemented_count"));
ASSERT_EQ (0, response.json.get<uint64_t> ("unchecked_count"));
@ -7834,12 +7833,13 @@ void compare_default_test_result_data (test_response & response, nano::node cons
ASSERT_EQ (1, response.json.get<uint32_t> ("peer_count"));
ASSERT_EQ (node_server_a.network_params.protocol.protocol_version, response.json.get<uint8_t> ("protocol_version"));
ASSERT_GE (100, response.json.get<uint64_t> ("uptime"));
ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get<std::string> ("genesis_block"));
ASSERT_EQ (node_server_a.network_params.ledger.genesis_hash.to_string (), response.json.get<std::string> ("genesis_block"));
ASSERT_EQ (nano::get_major_node_version (), response.json.get<uint8_t> ("major_version"));
ASSERT_EQ (nano::get_minor_node_version (), response.json.get<uint8_t> ("minor_version"));
ASSERT_EQ (nano::get_patch_node_version (), response.json.get<uint8_t> ("patch_version"));
ASSERT_EQ (nano::get_pre_release_node_version (), response.json.get<uint8_t> ("pre_release_version"));
ASSERT_EQ (0, response.json.get<uint8_t> ("maker"));
ASSERT_GE (std::chrono::duration_cast<std::chrono::seconds> (std::chrono::system_clock::now ().time_since_epoch ()).count (), response.json.get<uint64_t> ("timestamp"));
}
}
@ -7941,7 +7941,7 @@ TEST (rpc, node_telemetry_single)
}
}
TEST (rpc, node_telemetry_random)
TEST (rpc, node_telemetry_all)
{
nano::system system (1);
auto & node1 = *add_ipc_enabled_node (system);
@ -7974,22 +7974,7 @@ TEST (rpc, node_telemetry_random)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (200, response.status);
ASSERT_FALSE (response.json.get<bool> ("cached"));
ASSERT_EQ (1, response.json.get<uint64_t> ("block_count"));
ASSERT_EQ (1, response.json.get<uint64_t> ("cemented_count"));
ASSERT_EQ (0, response.json.get<uint64_t> ("unchecked_count"));
ASSERT_EQ (1, response.json.get<uint64_t> ("account_count"));
ASSERT_EQ (node->config.bandwidth_limit, response.json.get<uint64_t> ("bandwidth_cap"));
ASSERT_EQ (1, response.json.get<uint32_t> ("peer_count"));
ASSERT_EQ (node->network_params.protocol.protocol_version, response.json.get<uint8_t> ("protocol_version"));
ASSERT_GE (100, response.json.get<uint64_t> ("uptime"));
ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get<std::string> ("genesis_block"));
ASSERT_EQ (nano::get_major_node_version (), response.json.get<uint8_t> ("major_version"));
ASSERT_EQ (nano::get_minor_node_version (), response.json.get<uint8_t> ("minor_version"));
ASSERT_EQ (nano::get_patch_node_version (), response.json.get<uint8_t> ("patch_version"));
ASSERT_EQ (nano::get_pre_release_node_version (), response.json.get<uint8_t> ("pre_release_version"));
ASSERT_EQ (0, response.json.get<uint8_t> ("maker"));
compare_default_test_result_data (response, *node);
}
request.put ("raw", "true");
@ -8002,30 +7987,54 @@ TEST (rpc, node_telemetry_random)
ASSERT_EQ (200, response.status);
// This may fail if the response has taken longer than the cache cutoff time.
ASSERT_TRUE (response.json.get<bool> ("cached"));
auto & all_metrics = response.json.get_child ("metrics");
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint32_t, uint8_t, uint64_t, std::string, uint8_t, uint8_t, uint8_t, uint8_t, uint8_t>> raw_metrics_json_l;
class telemetry_response_data
{
public:
uint64_t block_count;
uint64_t cemented_count;
uint64_t unchecked_count;
uint64_t account_count;
uint64_t bandwidth_cap;
uint32_t peer_count;
uint8_t protocol_version;
uint64_t uptime;
std::string genesis_block;
uint8_t major_version;
uint8_t minor_version;
uint8_t patch_version;
uint8_t pre_release_version;
uint8_t maker;
uint64_t timestamp;
std::string address;
uint16_t port;
};
std::vector<telemetry_response_data> raw_metrics_json_l;
for (auto & metrics_pair : all_metrics)
{
auto & metrics = metrics_pair.second;
raw_metrics_json_l.emplace_back (metrics.get<uint64_t> ("block_count"), metrics.get<uint64_t> ("cemented_count"), metrics.get<uint64_t> ("unchecked_count"), metrics.get<uint64_t> ("account_count"), metrics.get<uint64_t> ("bandwidth_cap"), metrics.get<uint64_t> ("peer_count"), metrics.get<uint8_t> ("protocol_version"), metrics.get<uint64_t> ("uptime"), metrics.get<std::string> ("genesis_block"), metrics.get<uint8_t> ("major_version"), metrics.get<uint8_t> ("minor_version"), metrics.get<uint8_t> ("patch_version"), metrics.get<uint8_t> ("pre_release_version"), metrics.get<uint8_t> ("maker"));
raw_metrics_json_l.push_back ({ metrics.get<uint64_t> ("block_count"), metrics.get<uint64_t> ("cemented_count"), metrics.get<uint64_t> ("unchecked_count"), metrics.get<uint64_t> ("account_count"), metrics.get<uint64_t> ("bandwidth_cap"), metrics.get<uint32_t> ("peer_count"), metrics.get<uint8_t> ("protocol_version"), metrics.get<uint64_t> ("uptime"), metrics.get<std::string> ("genesis_block"), metrics.get<uint8_t> ("major_version"), metrics.get<uint8_t> ("minor_version"), metrics.get<uint8_t> ("patch_version"), metrics.get<uint8_t> ("pre_release_version"), metrics.get<uint8_t> ("maker"), metrics.get<uint64_t> ("timestamp"), metrics.get<std::string> ("address"), metrics.get<uint16_t> ("port") });
}
ASSERT_EQ (1, raw_metrics_json_l.size ());
auto const & metrics = raw_metrics_json_l.front ();
ASSERT_EQ (1, std::get<0> (metrics));
ASSERT_EQ (1, std::get<1> (metrics));
ASSERT_EQ (0, std::get<2> (metrics));
ASSERT_EQ (1, std::get<3> (metrics));
ASSERT_EQ (node->config.bandwidth_limit, std::get<4> (metrics));
ASSERT_EQ (1, std::get<5> (metrics));
ASSERT_EQ (node->network_params.protocol.protocol_version, std::get<6> (metrics));
ASSERT_GE (100, std::get<7> (metrics));
ASSERT_EQ (nano::genesis ().hash ().to_string (), std::get<8> (metrics));
ASSERT_EQ (nano::get_major_node_version (), std::get<9> (metrics));
ASSERT_EQ (nano::get_minor_node_version (), std::get<10> (metrics));
ASSERT_EQ (nano::get_patch_node_version (), std::get<11> (metrics));
ASSERT_EQ (nano::get_pre_release_node_version (), std::get<12> (metrics));
ASSERT_EQ (0, std::get<13> (metrics));
ASSERT_EQ (1, metrics.block_count);
ASSERT_EQ (1, metrics.cemented_count);
ASSERT_EQ (0, metrics.unchecked_count);
ASSERT_EQ (1, metrics.account_count);
ASSERT_EQ (node->config.bandwidth_limit, metrics.bandwidth_cap);
ASSERT_EQ (1, metrics.peer_count);
ASSERT_EQ (node->network_params.protocol.protocol_version, metrics.protocol_version);
ASSERT_GE (100, metrics.uptime);
ASSERT_EQ (node1.network_params.ledger.genesis_hash.to_string (), metrics.genesis_block);
ASSERT_EQ (nano::get_major_node_version (), metrics.major_version);
ASSERT_EQ (nano::get_minor_node_version (), metrics.minor_version);
ASSERT_EQ (nano::get_patch_node_version (), metrics.patch_version);
ASSERT_EQ (nano::get_pre_release_node_version (), metrics.pre_release_version);
ASSERT_EQ (0, metrics.maker);
ASSERT_GE (std::chrono::duration_cast<std::chrono::seconds> (std::chrono::system_clock::now ().time_since_epoch ()).count (), metrics.timestamp);
ASSERT_EQ (node->network.endpoint ().address ().to_string (), metrics.address);
ASSERT_EQ (node->network.endpoint ().port (), metrics.port);
}

View file

@ -7,6 +7,8 @@
#include <gtest/gtest.h>
#include <numeric>
using namespace std::chrono_literals;
TEST (system, generate_mass_activity)
@ -791,3 +793,240 @@ TEST (confirmation_height, prioritize_frontiers_overwrite)
}
}
}
namespace
{
void wait_peer_connections (nano::system & system_a)
{
system_a.deadline_set (10s);
auto peer_count = 0;
auto num_nodes = system_a.nodes.size ();
while (peer_count != num_nodes * (num_nodes - 1))
{
ASSERT_NO_ERROR (system_a.poll ());
peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [](auto total, auto const & node) {
auto transaction = node->store.tx_begin_read ();
return total += node->store.peer_count (transaction);
});
}
}
class data
{
public:
std::atomic<bool> awaiting_cache{ false };
std::atomic<bool> keep_requesting_metrics{ true };
std::shared_ptr<nano::node> node;
std::chrono::steady_clock::time_point orig_time;
std::atomic_flag orig_time_set = ATOMIC_FLAG_INIT;
};
class shared_data
{
public:
std::atomic<bool> done{ false };
std::atomic<uint64_t> count{ 0 };
std::promise<void> promise;
std::shared_future<void> shared_future{ promise.get_future () };
};
template <typename T>
void callback_process (shared_data & shared_data_a, data & data, T & all_node_data_a, std::chrono::steady_clock::time_point last_updated)
{
if (!data.orig_time_set.test_and_set ())
{
data.orig_time = last_updated;
}
if (data.awaiting_cache && data.orig_time != last_updated)
{
data.keep_requesting_metrics = false;
}
if (data.orig_time != last_updated)
{
data.awaiting_cache = true;
data.orig_time = last_updated;
}
if (--shared_data_a.count == 0 && std::all_of (all_node_data_a.begin (), all_node_data_a.end (), [](auto const & data) { return !data.keep_requesting_metrics; }))
{
shared_data_a.done = true;
shared_data_a.promise.set_value ();
}
};
}
namespace nano
{
TEST (node_telemetry, ongoing_requests)
{
nano::system system (2);
auto node_client = system.nodes.front ();
auto node_server = system.nodes.back ();
wait_peer_connections (system);
ASSERT_EQ (0, node_client->telemetry.telemetry_data_size ());
ASSERT_EQ (0, node_server->telemetry.telemetry_data_size ());
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::out));
system.deadline_set (20s);
while (node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in) != 1 || node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out) != 1)
{
ASSERT_NO_ERROR (system.poll ());
}
// Wait till the next ongoing will be called, and add a 1s buffer for the actual processing
auto time = std::chrono::steady_clock::now ();
while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + nano::telemetry_impl::alarm_cutoff + 1s))
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in));
ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in));
ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out));
}
}
TEST (node_telemetry, simultaneous_random_requests)
{
const auto num_nodes = 4;
nano::system system (num_nodes);
// Wait until peers are stored as they are done in the background
wait_peer_connections (system);
std::vector<std::thread> threads;
const auto num_threads = 4;
std::array<data, num_nodes> all_data{};
for (auto i = 0; i < num_nodes; ++i)
{
all_data[i].node = system.nodes[i];
}
shared_data shared_data;
// Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired.
// The test waits until all telemetry_ack messages have been received.
for (int i = 0; i < num_threads; ++i)
{
threads.emplace_back ([&all_data, &shared_data]() {
while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
{
for (auto & data : all_data)
{
// Keep calling requesting telemetry metrics until the cache has been saved and then become outdated (after a certain period of time) for each node
if (data.keep_requesting_metrics)
{
++shared_data.count;
data.node->telemetry.get_metrics_peers_async ([&shared_data, &data, &all_data](nano::telemetry_data_responses const & responses_a) {
callback_process (shared_data, data, all_data, responses_a.telemetry_data_time_pairs.begin ()->second.last_updated);
});
}
std::this_thread::sleep_for (1ms);
}
}
shared_data.shared_future.wait ();
ASSERT_EQ (shared_data.count, 0);
});
}
system.deadline_set (20s);
while (!shared_data.done)
{
ASSERT_NO_ERROR (system.poll ());
}
for (auto & thread : threads)
{
thread.join ();
}
}
namespace nano
{
namespace transport
{
TEST (node_telemetry, simultaneous_single_and_random_requests)
{
const auto num_nodes = 4;
nano::system system (num_nodes);
wait_peer_connections (system);
std::vector<std::thread> threads;
const auto num_threads = 4;
std::array<data, num_nodes> node_data_single{};
std::array<data, num_nodes> node_data_random{};
for (auto i = 0; i < num_nodes; ++i)
{
node_data_single[i].node = system.nodes[i];
node_data_random[i].node = system.nodes[i];
}
shared_data shared_data_single;
shared_data shared_data_random;
// Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired.
// The test waits until all telemetry_ack messages have been received.
for (int i = 0; i < num_threads; ++i)
{
threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() {
auto func = [](auto & all_node_data_a, shared_data & shared_data_a, bool single_a) {
while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
{
for (auto & data : all_node_data_a)
{
// Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node
if (data.keep_requesting_metrics)
{
++shared_data_a.count;
if (single_a)
{
// Pick first peer to be consistent
auto peer = data.node->network.tcp_channels.channels[0].channel;
data.node->telemetry.get_metrics_single_peer_async (peer, [&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_response const & telemetry_data_response_a) {
callback_process (shared_data_a, data, all_node_data_a, telemetry_data_response_a.telemetry_data_time_pair.last_updated);
});
}
else
{
data.node->telemetry.get_metrics_peers_async ([&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_responses const & telemetry_data_responses_a) {
callback_process (shared_data_a, data, all_node_data_a, telemetry_data_responses_a.telemetry_data_time_pairs.begin ()->second.last_updated);
});
}
}
std::this_thread::sleep_for (1ms);
}
}
shared_data_a.shared_future.wait ();
ASSERT_EQ (shared_data_a.count, 0);
};
func (node_data_single, shared_data_single, true);
func (node_data_random, shared_data_random, false);
});
}
system.deadline_set (30s);
while (!shared_data_random.done || !shared_data_single.done)
{
ASSERT_NO_ERROR (system.poll ());
}
for (auto & thread : threads)
{
thread.join ();
}
}
}
}