diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 52bd342f..b482db09 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -19,11 +19,12 @@ add_executable (core_test ledger.cpp locks.cpp logger.cpp - network.cpp - node.cpp message.cpp message_parser.cpp memory_pool.cpp + network.cpp + node.cpp + node_telemetry.cpp processor_service.cpp peer_container.cpp request_aggregator.cpp diff --git a/nano/core_test/message_parser.cpp b/nano/core_test/message_parser.cpp index a377ef75..f67b2fd6 100644 --- a/nano/core_test/message_parser.cpp +++ b/nano/core_test/message_parser.cpp @@ -25,33 +25,37 @@ public: } void bulk_pull (nano::bulk_pull const &) override { - ++bulk_pull_count; + ASSERT_FALSE (true); } void bulk_pull_account (nano::bulk_pull_account const &) override { - ++bulk_pull_account_count; + ASSERT_FALSE (true); } void bulk_push (nano::bulk_push const &) override { - ++bulk_push_count; + ASSERT_FALSE (true); } void frontier_req (nano::frontier_req const &) override { - ++frontier_req_count; + ASSERT_FALSE (true); } void node_id_handshake (nano::node_id_handshake const &) override { - ++node_id_handshake_count; + ASSERT_FALSE (true); } + void telemetry_req (nano::telemetry_req const &) override + { + ASSERT_FALSE (true); + } + void telemetry_ack (nano::telemetry_ack const &) override + { + ASSERT_FALSE (true); + } + uint64_t keepalive_count{ 0 }; uint64_t publish_count{ 0 }; uint64_t confirm_req_count{ 0 }; uint64_t confirm_ack_count{ 0 }; - uint64_t bulk_pull_count{ 0 }; - uint64_t bulk_pull_account_count{ 0 }; - uint64_t bulk_push_count{ 0 }; - uint64_t frontier_req_count{ 0 }; - uint64_t node_id_handshake_count{ 0 }; }; } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 140e23d5..adc00483 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3433,7 +3433,7 @@ TEST (node, bidirectional_tcp) // Test block confirmation from node 2 system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); confirmed = false; - system.deadline_set (10s); + system.deadline_set (20s); while (!confirmed) { auto transaction1 (node1->store.tx_begin_read ()); diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp new file mode 100644 index 00000000..1c06daf5 --- /dev/null +++ b/nano/core_test/node_telemetry.cpp @@ -0,0 +1,753 @@ +#include +#include +#include + +#include + +#include + +using namespace std::chrono_literals; + +namespace +{ +void wait_peer_connections (nano::system & system_a); +void compare_default_test_result_data (nano::telemetry_data & telemetry_data_a, nano::node const & node_server_a); +} + +TEST (node_telemetry, consolidate_data) +{ + nano::telemetry_data data; + data.account_count = 2; + data.block_count = 1; + data.cemented_count = 1; + data.vendor_version = 20; + data.protocol_version_number = 12; + data.peer_count = 2; + data.bandwidth_cap = 100; + data.unchecked_count = 3; + data.uptime = 6; + data.genesis_block = nano::block_hash (3); + + nano::telemetry_data data1; + data1.account_count = 5; + data1.block_count = 7; + data1.cemented_count = 4; + data1.vendor_version = 10; + data1.protocol_version_number = 11; + data1.peer_count = 5; + data1.bandwidth_cap = 0; + data1.unchecked_count = 1; + data1.uptime = 10; + data1.genesis_block = nano::block_hash (4); + + nano::telemetry_data data2; + data2.account_count = 3; + data2.block_count = 3; + data2.cemented_count = 2; + data2.vendor_version = 20; + data2.protocol_version_number = 11; + data2.peer_count = 4; + data2.bandwidth_cap = 0; + data2.unchecked_count = 2; + data2.uptime = 3; + data2.genesis_block = nano::block_hash (4); + + std::vector all_data{ data, data1, data2 }; + + auto consolidated_telemetry_data = nano::telemetry_data::consolidate (all_data); + ASSERT_EQ (consolidated_telemetry_data.account_count, 3); + ASSERT_EQ (consolidated_telemetry_data.block_count, 3); + ASSERT_EQ (consolidated_telemetry_data.cemented_count, 2); + ASSERT_EQ (consolidated_telemetry_data.vendor_version, 20); + ASSERT_EQ (consolidated_telemetry_data.protocol_version_number, 11); + ASSERT_EQ (consolidated_telemetry_data.peer_count, 3); + ASSERT_EQ (consolidated_telemetry_data.bandwidth_cap, 0); + ASSERT_EQ (consolidated_telemetry_data.unchecked_count, 2); + ASSERT_EQ (consolidated_telemetry_data.uptime, 6); + ASSERT_EQ (consolidated_telemetry_data.genesis_block, nano::block_hash (4)); + + // Modify the metrics which may be either the mode or averages to ensure all are tested. + all_data[2].bandwidth_cap = 53; + all_data[2].protocol_version_number = 13; + all_data[2].vendor_version = 13; + all_data[2].genesis_block = nano::block_hash (3); + + auto consolidated_telemetry_data1 = nano::telemetry_data::consolidate (all_data); + ASSERT_TRUE (consolidated_telemetry_data1.vendor_version == 10 || consolidated_telemetry_data1.vendor_version == 13 || consolidated_telemetry_data1.vendor_version == 20); + ASSERT_TRUE (consolidated_telemetry_data1.protocol_version_number == 11 || consolidated_telemetry_data1.protocol_version_number == 12 || consolidated_telemetry_data1.protocol_version_number == 13); + ASSERT_EQ (consolidated_telemetry_data1.bandwidth_cap, 51); + ASSERT_EQ (consolidated_telemetry_data1.genesis_block, nano::block_hash (3)); + + // Test equality operator + ASSERT_FALSE (consolidated_telemetry_data == consolidated_telemetry_data1); + ASSERT_EQ (consolidated_telemetry_data, consolidated_telemetry_data); +} + +TEST (node_telemetry, no_peers) +{ + nano::system system (1); + + std::atomic done{ false }; + system.nodes[0]->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { + ASSERT_TRUE (responses_a.data.empty ()); + ASSERT_FALSE (responses_a.all_received); + ASSERT_FALSE (responses_a.is_cached); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + +namespace nano +{ +TEST (node_telemetry, basic) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + // Request telemetry metrics + std::vector all_telemetry_data; + { + std::atomic done{ false }; + node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) { + ASSERT_FALSE (responses_a.is_cached); + ASSERT_TRUE (responses_a.all_received); + all_telemetry_data = responses_a.data; + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + // Check the metrics are correct + ASSERT_EQ (all_telemetry_data.size (), 1); + auto & telemetry_data = all_telemetry_data.front (); + compare_default_test_result_data (telemetry_data, *node_server); + + // Call again straight away. It should use the cache + { + std::atomic done{ false }; + node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) { + ASSERT_EQ (telemetry_data, responses_a.data.front ()); + ASSERT_TRUE (responses_a.is_cached); + ASSERT_TRUE (responses_a.all_received); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + // Wait the cache period and check cache is not used + std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff); + + std::atomic done{ false }; + node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) { + ASSERT_FALSE (responses_a.is_cached); + ASSERT_TRUE (responses_a.all_received); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } +} +} + +TEST (node_telemetry, many_nodes) +{ + nano::system system; + // The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number. + const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10; + for (auto i = 0; i < num_nodes; ++i) + { + nano::node_config node_config (nano::get_available_port (), system.logging); + // Make a metric completely different for each node so we can get afterwards that there are no duplicates + node_config.bandwidth_limit = 100000 + i; + system.add_node (node_config); + } + + wait_peer_connections (system); + + // Give all nodes a non-default number of blocks + nano::keypair key; + nano::genesis genesis; + nano::state_block send (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())); + for (auto node : system.nodes) + { + auto transaction (node->store.tx_begin_write ()); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); + } + + // This is the node which will request metrics from all other nodes + auto node_client = system.nodes.front (); + + std::atomic done{ false }; + std::vector all_telemetry_data; + node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) { + ASSERT_FALSE (responses_a.is_cached); + ASSERT_TRUE (responses_a.all_received); + all_telemetry_data = responses_a.data; + done = true; + }); + + system.deadline_set (20s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Check the metrics + nano::network_params params; + for (auto & data : all_telemetry_data) + { + ASSERT_EQ (data.unchecked_count, 0); + ASSERT_EQ (data.cemented_count, 1); + ASSERT_LE (data.peer_count, 9); + ASSERT_EQ (data.account_count, 1); + ASSERT_TRUE (data.block_count == 2); + ASSERT_EQ (data.protocol_version_number, params.protocol.telemetry_protocol_version_min); + ASSERT_GE (data.bandwidth_cap, 100000); + ASSERT_LT (data.bandwidth_cap, 100000 + system.nodes.size ()); + ASSERT_EQ (data.vendor_version, nano::get_major_node_version ()); + ASSERT_LT (data.uptime, 100); + ASSERT_EQ (data.genesis_block, genesis.hash ()); + } + + // We gave some nodes different bandwidth caps, confirm they are not all the time + auto all_bandwidth_limits_same = std::all_of (all_telemetry_data.begin () + 1, all_telemetry_data.end (), [bandwidth_cap = all_telemetry_data[0].bandwidth_cap](auto & telemetry) { + return telemetry.bandwidth_cap == bandwidth_cap; + }); + ASSERT_FALSE (all_bandwidth_limits_same); +} + +TEST (node_telemetry, receive_from_non_listening_channel) +{ + nano::system system; + auto node = system.add_node (); + nano::telemetry_ack message (nano::telemetry_data{}); + node->network.process_message (message, node->network.udp_channels.create (node->network.endpoint ())); + // We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it. + ASSERT_EQ (node->telemetry.telemetry_data_size (), 0); +} + +TEST (node_telemetry, over_udp) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.disable_tcp_realtime = true; + auto node_client = system.add_node (node_flags); + auto node_server = system.add_node (node_flags); + + wait_peer_connections (system); + + std::atomic done{ false }; + std::vector all_telemetry_data; + node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) { + ASSERT_FALSE (responses_a.is_cached); + ASSERT_TRUE (responses_a.all_received); + all_telemetry_data = responses_a.data; + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + ASSERT_EQ (all_telemetry_data.size (), 1); + compare_default_test_result_data (all_telemetry_data.front (), *node_server); + + // Check channels are indeed udp + ASSERT_EQ (1, node_client->network.size ()); + auto list1 (node_client->network.list (2)); + ASSERT_EQ (node_server->network.endpoint (), list1[0]->get_endpoint ()); + ASSERT_EQ (nano::transport::transport_type::udp, list1[0]->get_type ()); + ASSERT_EQ (1, node_server->network.size ()); + auto list2 (node_server->network.list (2)); + ASSERT_EQ (node_client->network.endpoint (), list2[0]->get_endpoint ()); + ASSERT_EQ (nano::transport::transport_type::udp, list2[0]->get_type ()); +} + +TEST (node_telemetry, simultaneous_random_requests) +{ + const auto num_nodes = 4; + nano::system system (num_nodes); + + // Wait until peers are stored as they are done in the background + wait_peer_connections (system); + + std::vector threads; + const auto num_threads = 4; + + std::atomic done{ false }; + class Data + { + public: + std::atomic awaiting_cache{ false }; + std::atomic keep_requesting_metrics{ true }; + std::shared_ptr node; + }; + + std::array all_data{}; + for (auto i = 0; i < num_nodes; ++i) + { + all_data[i].node = system.nodes[i]; + } + + std::atomic count{ 0 }; + std::promise promise; + std::shared_future shared_future (promise.get_future ()); + + // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. + // The test waits until all telemetry_ack messages have been received. + for (int i = 0; i < num_threads; ++i) + { + threads.emplace_back ([&all_data, &done, &count, &promise, &shared_future]() { + while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) + { + for (auto & data : all_data) + { + // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node + if (data.keep_requesting_metrics) + { + ++count; + + data.node->telemetry.get_metrics_random_peers_async ([&promise, &done, &data, &all_data, &count](nano::telemetry_data_responses const & responses_a) { + if (data.awaiting_cache && !responses_a.is_cached) + { + data.keep_requesting_metrics = false; + } + if (responses_a.is_cached) + { + data.awaiting_cache = true; + } + if (--count == 0 && std::all_of (all_data.begin (), all_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; })) + { + done = true; + promise.set_value (); + } + }); + } + std::this_thread::sleep_for (1ms); + } + } + + ASSERT_EQ (count, 0); + shared_future.wait (); + }); + } + + system.deadline_set (20s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + for (auto & thread : threads) + { + thread.join (); + } +} + +namespace nano +{ +TEST (node_telemetry, single_request) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + // Request telemetry metrics + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + nano::telemetry_data telemetry_data; + { + std::atomic done{ false }; + + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.is_cached); + ASSERT_FALSE (response_a.error); + telemetry_data = response_a.data; + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + // Check the metrics are correct + compare_default_test_result_data (telemetry_data, *node_server); + + // Call again straight away. It should use the cache + { + std::atomic done{ false }; + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + ASSERT_EQ (telemetry_data, response_a.data); + ASSERT_TRUE (response_a.is_cached); + ASSERT_FALSE (response_a.error); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + // Wait the cache period and check cache is not used + std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff); + + std::atomic done{ false }; + node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.is_cached); + ASSERT_FALSE (response_a.error); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } +} +} + +TEST (node_telemetry, single_request_invalid_channel) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + std::atomic done{ false }; + node_client->telemetry.get_metrics_single_peer_async (nullptr, [&done](nano::telemetry_data_response const & response_a) { + ASSERT_TRUE (response_a.error); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + +TEST (node_telemetry, simultaneous_single_and_random_requests) +{ + const auto num_nodes = 4; + nano::system system (num_nodes); + + wait_peer_connections (system); + + std::vector threads; + const auto num_threads = 4; + + class data + { + public: + std::atomic awaiting_cache{ false }; + std::atomic keep_requesting_metrics{ true }; + std::shared_ptr node; + }; + + std::array node_data_single{}; + std::array node_data_random{}; + for (auto i = 0; i < num_nodes; ++i) + { + node_data_single[i].node = system.nodes[i]; + node_data_random[i].node = system.nodes[i]; + } + + class shared_data + { + public: + std::atomic done{ false }; + std::atomic count{ 0 }; + std::promise promise; + std::shared_future shared_future{ promise.get_future () }; + }; + + shared_data shared_data_single; + shared_data shared_data_random; + + // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. + // The test waits until all telemetry_ack messages have been received. + for (int i = 0; i < num_threads; ++i) + { + threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() { + auto func = [](auto & all_node_data_a, shared_data & shared_data_a) { + while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) + { + for (auto & data : all_node_data_a) + { + // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node + if (data.keep_requesting_metrics) + { + ++shared_data_a.count; + + data.node->telemetry.get_metrics_random_peers_async ([& shared_data = shared_data_a, &data, &all_node_data = all_node_data_a](nano::telemetry_data_responses const & responses_a) { + if (data.awaiting_cache && !responses_a.is_cached) + { + data.keep_requesting_metrics = false; + } + if (responses_a.is_cached) + { + data.awaiting_cache = true; + } + if (--shared_data.count == 0 && std::all_of (all_node_data.begin (), all_node_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; })) + { + shared_data.done = true; + shared_data.promise.set_value (); + } + }); + } + std::this_thread::sleep_for (1ms); + } + } + + ASSERT_EQ (shared_data_a.count, 0); + shared_data_a.shared_future.wait (); + }; + + func (node_data_single, shared_data_single); + func (node_data_random, shared_data_random); + }); + } + + system.deadline_set (20s); + while (!shared_data_single.done || !shared_data_random.done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + for (auto & thread : threads) + { + thread.join (); + } +} + +TEST (node_telemetry, blocking_single_and_random) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + // Request telemetry metrics + std::atomic done{ false }; + std::function call_system_poll; + std::promise promise; + call_system_poll = [&call_system_poll, &worker = node_client->worker, &done, &system, &promise]() { + if (!done) + { + ASSERT_NO_ERROR (system.poll ()); + worker.push_task (call_system_poll); + } + else + { + promise.set_value (); + } + }; + + // Keep pushing system.polls in another thread (worker), because we will be blocking this thread and unable to do so. + system.deadline_set (10s); + node_client->worker.push_task (call_system_poll); + + // Blocking version of get_random_metrics_async + auto telemetry_data_responses = node_client->telemetry.get_metrics_random_peers (); + ASSERT_FALSE (telemetry_data_responses.is_cached); + ASSERT_TRUE (telemetry_data_responses.all_received); + compare_default_test_result_data (telemetry_data_responses.data.front (), *node_server); + + // Now try single request metric + auto telemetry_data_response = node_client->telemetry.get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ())); + ASSERT_FALSE (telemetry_data_response.is_cached); + ASSERT_FALSE (telemetry_data_response.error); + compare_default_test_result_data (telemetry_data_response.data, *node_server); + + done = true; + promise.get_future ().wait (); +} + +namespace nano +{ +TEST (node_telemetry, multiple_single_request_clearing) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.bandwidth_limit = 100000; + auto node_server1 = system.add_node (node_config); + + wait_peer_connections (system); + + // Request telemetry metrics + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + + std::atomic done{ false }; + node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.error); + ASSERT_FALSE (response_a.is_cached); + done = true; + }); + + ASSERT_EQ (1, node_client->telemetry.single_requests.size ()); + auto last_updated = node_client->telemetry.single_requests.begin ()->second.last_updated; + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + done = false; + // Make another request to keep + system.deadline_set (10s); + node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.error); + ASSERT_TRUE (response_a.is_cached); + done = true; + }); + + ASSERT_LT (last_updated, node_client->telemetry.single_requests.begin ()->second.last_updated); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + done = false; + auto channel1 = node_client->network.find_channel (node_server1->network.endpoint ()); + node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.error); + ASSERT_FALSE (response_a.is_cached); + done = true; + }); + + system.deadline_set (10s); + + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + done = false; + node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.error); + ASSERT_TRUE (response_a.is_cached); + done = true; + }); + + // single_requests should be removed as no more calls are being back + system.deadline_set (10s); + nano::unique_lock lk (node_client->telemetry.mutex); + while (!node_client->telemetry.single_requests.empty () || !done) + { + lk.unlock (); + ASSERT_NO_ERROR (system.poll ()); + lk.lock (); + } +} +} + +TEST (node_telemetry, disconnects) +{ + nano::system system (2); + + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); + + wait_peer_connections (system); + + // Try and request metrics from a node which is turned off but a channel is not closed yet + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + node_server->stop (); + ASSERT_TRUE (channel); + + std::atomic done{ false }; + node_client->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { + ASSERT_FALSE (responses_a.all_received); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + done = false; + node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { + ASSERT_TRUE (response_a.error); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + +namespace +{ +void wait_peer_connections (nano::system & system_a) +{ + system_a.deadline_set (10s); + auto peer_count = 0; + auto num_nodes = system_a.nodes.size (); + while (peer_count != num_nodes * (num_nodes - 1)) + { + ASSERT_NO_ERROR (system_a.poll ()); + peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [](auto total, auto const & node) { + auto transaction = node->store.tx_begin_read (); + return total += node->store.peer_count (transaction); + }); + } +} + +void compare_default_test_result_data (nano::telemetry_data & telemetry_data_a, nano::node const & node_server_a) +{ + ASSERT_EQ (telemetry_data_a.block_count, 1); + ASSERT_EQ (telemetry_data_a.cemented_count, 1); + ASSERT_EQ (telemetry_data_a.bandwidth_cap, node_server_a.config.bandwidth_limit); + ASSERT_EQ (telemetry_data_a.peer_count, 1); + ASSERT_EQ (telemetry_data_a.protocol_version_number, node_server_a.network_params.protocol.telemetry_protocol_version_min); + ASSERT_EQ (telemetry_data_a.unchecked_count, 0); + ASSERT_EQ (telemetry_data_a.account_count, 1); + ASSERT_EQ (telemetry_data_a.vendor_version, nano::get_major_node_version ()); + ASSERT_LT (telemetry_data_a.uptime, 100); + ASSERT_EQ (telemetry_data_a.genesis_block, nano::genesis ().hash ()); +} +} diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index d9c98be9..eba26404 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -199,6 +199,7 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.logging.network_publish_logging_value, defaults.node.logging.network_publish_logging_value); ASSERT_EQ (conf.node.logging.network_timeout_logging_value, defaults.node.logging.network_timeout_logging_value); ASSERT_EQ (conf.node.logging.node_lifetime_tracing_value, defaults.node.logging.node_lifetime_tracing_value); + ASSERT_EQ (conf.node.logging.network_telemetry_logging_value, defaults.node.logging.network_telemetry_logging_value); ASSERT_EQ (conf.node.logging.rotation_size, defaults.node.logging.rotation_size); ASSERT_EQ (conf.node.logging.single_line_record_value, defaults.node.logging.single_line_record_value); ASSERT_EQ (conf.node.logging.timing_logging_value, defaults.node.logging.timing_logging_value); @@ -453,6 +454,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) network_keepalive = true network_message = true network_node_id_handshake = true + network_telemetry_logging = true network_packet = true network_publish = true network_timeout = true @@ -583,6 +585,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.logging.network_keepalive_logging_value, defaults.node.logging.network_keepalive_logging_value); ASSERT_NE (conf.node.logging.network_message_logging_value, defaults.node.logging.network_message_logging_value); ASSERT_NE (conf.node.logging.network_node_id_handshake_logging_value, defaults.node.logging.network_node_id_handshake_logging_value); + ASSERT_NE (conf.node.logging.network_telemetry_logging_value, defaults.node.logging.network_telemetry_logging_value); ASSERT_NE (conf.node.logging.network_packet_logging_value, defaults.node.logging.network_packet_logging_value); ASSERT_NE (conf.node.logging.network_publish_logging_value, defaults.node.logging.network_publish_logging_value); ASSERT_NE (conf.node.logging.network_timeout_logging_value, defaults.node.logging.network_timeout_logging_value); diff --git a/nano/lib/CMakeLists.txt b/nano/lib/CMakeLists.txt index faef2b19..6d25e5fb 100644 --- a/nano/lib/CMakeLists.txt +++ b/nano/lib/CMakeLists.txt @@ -78,6 +78,8 @@ if (NANO_STACKTRACE_BACKTRACE) endif () target_compile_definitions(nano_lib + PRIVATE + -DMAJOR_VERSION_STRING=${CPACK_PACKAGE_VERSION_MAJOR} PUBLIC -DACTIVE_NETWORK=${ACTIVE_NETWORK} ) diff --git a/nano/lib/config.cpp b/nano/lib/config.cpp index bd6dc1f8..7fcc7c44 100644 --- a/nano/lib/config.cpp +++ b/nano/lib/config.cpp @@ -1,6 +1,7 @@ #include #include +#include #include @@ -8,6 +9,11 @@ namespace nano { const char * network_constants::active_network_err_msg = "Invalid network. Valid values are live, beta and test."; +uint8_t get_major_node_version () +{ + return boost::numeric_cast (boost::lexical_cast (NANO_MAJOR_VERSION_STRING)); +} + void force_nano_test_network () { nano::network_constants::set_active_network (nano::nano_networks::nano_test_network); diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index 59c7dedd..9ef0e5e9 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -20,6 +20,7 @@ namespace filesystem * Returns build version information */ const char * const NANO_VERSION_STRING = xstr (TAG_VERSION_STRING); +const char * const NANO_MAJOR_VERSION_STRING = xstr (MAJOR_VERSION_STRING); const char * const BUILD_INFO = xstr (GIT_COMMIT_HASH BOOST_COMPILER) " \"BOOST " xstr (BOOST_VERSION) "\" BUILT " xstr (__DATE__); @@ -36,6 +37,8 @@ const bool is_sanitizer_build = false; namespace nano { +uint8_t get_major_node_version (); + /** * Network variants with different genesis blocks and network parameters * @warning Enum values are used in integral comparisons; do not change. diff --git a/nano/lib/errors.cpp b/nano/lib/errors.cpp index 6f0296bb..2ff739fb 100644 --- a/nano/lib/errors.cpp +++ b/nano/lib/errors.cpp @@ -200,6 +200,10 @@ std::string nano::error_rpc_messages::message (int ev) const return "Account has non-zero balance"; case nano::error_rpc::payment_unable_create_account: return "Unable to create transaction account"; + case nano::error_rpc::peer_not_found: + return "Peer not found"; + case nano::error_rpc::requires_port_and_address: + return "Both port and address required"; case nano::error_rpc::rpc_control_disabled: return "RPC control is disabled"; case nano::error_rpc::sign_hash_disabled: diff --git a/nano/lib/errors.hpp b/nano/lib/errors.hpp index be710b2e..32535c02 100644 --- a/nano/lib/errors.hpp +++ b/nano/lib/errors.hpp @@ -112,6 +112,8 @@ enum class error_rpc invalid_timestamp, payment_account_balance, payment_unable_create_account, + peer_not_found, + requires_port_and_address, rpc_control_disabled, sign_hash_disabled, source_not_found diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index 07b2077f..4556717d 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -556,6 +556,12 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::send: res = "send"; break; + case nano::stat::detail::telemetry_req: + res = "telemetry_req"; + break; + case nano::stat::detail::telemetry_ack: + res = "telemetry_ack"; + break; case nano::stat::detail::state_block: res = "state_block"; break; @@ -634,6 +640,12 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::invalid_node_id_handshake_message: res = "invalid_node_id_handshake_message"; break; + case nano::stat::detail::invalid_telemetry_req_message: + res = "invalid_telemetry_req_message"; + break; + case nano::stat::detail::invalid_telemetry_ack_message: + res = "invalid_telemetry_ack_message"; + break; case nano::stat::detail::outdated_version: res = "outdated_version"; break; diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index 304b2adf..3b768833 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -233,6 +233,8 @@ public: confirm_req, confirm_ack, node_id_handshake, + telemetry_req, + telemetry_ack, // bootstrap, callback initiate, @@ -278,6 +280,8 @@ public: invalid_confirm_req_message, invalid_confirm_ack_message, invalid_node_id_handshake_message, + invalid_telemetry_req_message, + invalid_telemetry_ack_message, outdated_version, // tcp diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 776adc80..4c1e035a 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -76,6 +76,8 @@ add_library (node logging.cpp network.hpp network.cpp + telemetry.hpp + telemetry.cpp nodeconfig.hpp nodeconfig.cpp node_observers.hpp diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index aa9e3281..dd005ee7 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -237,6 +237,23 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co }); break; } + case nano::message_type::telemetry_req: + { + node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::in); + if (is_realtime_connection ()) + { + add_request (std::make_unique (header)); + } + receive (); + break; + } + case nano::message_type::telemetry_ack: + { + socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) { + this_l->receive_telemetry_ack_action (ec, size_a, header); + }); + break; + } default: { if (node->config.logging.network_logging ()) @@ -356,6 +373,31 @@ void nano::bootstrap_server::receive_keepalive_action (boost::system::error_code } } +void nano::bootstrap_server::receive_telemetry_ack_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a) +{ + if (!ec) + { + auto error (false); + nano::bufferstream stream (receive_buffer->data (), size_a); + auto request (std::make_unique (error, stream, header_a)); + if (!error) + { + if (is_realtime_connection ()) + { + add_request (std::unique_ptr (request.release ())); + } + receive (); + } + } + else + { + if (node->config.logging.network_telemetry_logging ()) + { + node->logger.try_log (boost::str (boost::format ("Error receiving telemetry ack: %1%") % ec.message ())); + } + } +} + void nano::bootstrap_server::receive_publish_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a) { if (!ec) @@ -523,7 +565,6 @@ public: connection (connection_a) { } - virtual ~request_response_visitor () = default; void keepalive (nano::keepalive const & message_a) override { connection->finish_request_async (); @@ -576,6 +617,22 @@ public: auto response (std::make_shared (connection, std::unique_ptr (static_cast (connection->requests.front ().release ())))); response->send_next (); } + void telemetry_req (nano::telemetry_req const & message_a) override + { + connection->finish_request_async (); + auto connection_l (connection->shared_from_this ()); + connection->node->background ([connection_l, message_a]() { + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); + }); + } + void telemetry_ack (nano::telemetry_ack const & message_a) override + { + connection->finish_request_async (); + auto connection_l (connection->shared_from_this ()); + connection->node->background ([connection_l, message_a]() { + connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); + }); + } void node_id_handshake (nano::node_id_handshake const & message_a) override { if (connection->node->config.logging.network_node_id_handshake_logging ()) diff --git a/nano/node/bootstrap/bootstrap_server.hpp b/nano/node/bootstrap/bootstrap_server.hpp index 61b705fe..63dfd9b0 100644 --- a/nano/node/bootstrap/bootstrap_server.hpp +++ b/nano/node/bootstrap/bootstrap_server.hpp @@ -57,6 +57,7 @@ public: void receive_confirm_req_action (boost::system::error_code const &, size_t, nano::message_header const &); void receive_confirm_ack_action (boost::system::error_code const &, size_t, nano::message_header const &); void receive_node_id_handshake_action (boost::system::error_code const &, size_t, nano::message_header const &); + void receive_telemetry_ack_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a); void add_request (std::unique_ptr); void finish_request (); void finish_request_async (); diff --git a/nano/node/common.cpp b/nano/node/common.cpp index c5184a54..6b9d2258 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -12,6 +12,7 @@ std::bitset<16> constexpr nano::message_header::block_type_mask; std::bitset<16> constexpr nano::message_header::count_mask; + namespace { nano::protocol_constants const & get_protocol_constants () @@ -197,8 +198,9 @@ size_t nano::message_header::payload_length_bytes () const return nano::bulk_pull::size + (bulk_pull_is_count_present () ? nano::bulk_pull::extended_parameters_size : 0); } case nano::message_type::bulk_push: + case nano::message_type::telemetry_req: { - // bulk_push doesn't have a payload + // These don't have a payload return 0; } case nano::message_type::frontier_req: @@ -229,6 +231,10 @@ size_t nano::message_header::payload_length_bytes () const { return nano::node_id_handshake::size (*this); } + case nano::message_type::telemetry_ack: + { + return nano::telemetry_ack::size (*this); + } default: { assert (false); @@ -280,6 +286,14 @@ std::string nano::message_parser::status_string () { return "invalid_node_id_handshake_message"; } + case nano::message_parser::parse_status::invalid_telemetry_req_message: + { + return "invalid_telemetry_req_message"; + } + case nano::message_parser::parse_status::invalid_telemetry_ack_message: + { + return "invalid_telemetry_ack_message"; + } case nano::message_parser::parse_status::outdated_version: { return "outdated_version"; @@ -353,6 +367,16 @@ void nano::message_parser::deserialize_buffer (uint8_t const * buffer_a, size_t deserialize_node_id_handshake (stream, header); break; } + case nano::message_type::telemetry_req: + { + deserialize_telemetry_req (stream, header); + break; + } + case nano::message_type::telemetry_ack: + { + deserialize_telemetry_ack (stream, header); + break; + } default: { status = parse_status::invalid_message_type; @@ -467,6 +491,34 @@ void nano::message_parser::deserialize_node_id_handshake (nano::stream & stream_ } } +void nano::message_parser::deserialize_telemetry_req (nano::stream & stream_a, nano::message_header const & header_a) +{ + nano::telemetry_req incoming (header_a); + if (at_end (stream_a)) + { + visitor.telemetry_req (incoming); + } + else + { + status = parse_status::invalid_telemetry_req_message; + } +} + +void nano::message_parser::deserialize_telemetry_ack (nano::stream & stream_a, nano::message_header const & header_a) +{ + bool error_l (false); + nano::telemetry_ack incoming (error_l, stream_a, header_a); + // Intentionally not checking if at the end of stream, because these messages support backwards/forwards compatibility + if (!error_l) + { + visitor.telemetry_ack (incoming); + } + else + { + status = parse_status::invalid_telemetry_ack_message; + } +} + bool nano::message_parser::at_end (nano::stream & stream_a) { uint8_t junk; @@ -993,6 +1045,234 @@ void nano::bulk_push::visit (nano::message_visitor & visitor_a) const visitor_a.bulk_push (*this); } +nano::telemetry_req::telemetry_req () : +message (nano::message_type::telemetry_req) +{ +} + +nano::telemetry_req::telemetry_req (nano::message_header const & header_a) : +message (header_a) +{ +} + +bool nano::telemetry_req::deserialize (nano::stream & stream_a) +{ + assert (header.type == nano::message_type::telemetry_req); + return false; +} + +void nano::telemetry_req::serialize (nano::stream & stream_a) const +{ + header.serialize (stream_a); +} + +void nano::telemetry_req::visit (nano::message_visitor & visitor_a) const +{ + visitor_a.telemetry_req (*this); +} + +nano::telemetry_ack::telemetry_ack (bool & error_a, nano::stream & stream_a, nano::message_header const & message_header) : +message (message_header) +{ + if (!error_a) + { + error_a = deserialize (stream_a); + } +} + +nano::telemetry_ack::telemetry_ack (nano::telemetry_data const & telemetry_data_a) : +message (nano::message_type::telemetry_ack), +data (telemetry_data_a) +{ + header.extensions = telemetry_data::size; +} + +void nano::telemetry_ack::serialize (nano::stream & stream_a) const +{ + header.serialize (stream_a); + write (stream_a, data.block_count); + write (stream_a, data.cemented_count); + write (stream_a, data.unchecked_count); + write (stream_a, data.account_count); + write (stream_a, data.bandwidth_cap); + write (stream_a, data.peer_count); + write (stream_a, data.protocol_version_number); + write (stream_a, data.vendor_version); + write (stream_a, data.uptime); + write (stream_a, data.genesis_block.bytes); +} + +bool nano::telemetry_ack::deserialize (nano::stream & stream_a) +{ + auto error (false); + try + { + read (stream_a, data.block_count); + read (stream_a, data.cemented_count); + read (stream_a, data.unchecked_count); + read (stream_a, data.account_count); + read (stream_a, data.bandwidth_cap); + read (stream_a, data.peer_count); + read (stream_a, data.protocol_version_number); + read (stream_a, data.vendor_version); + read (stream_a, data.uptime); + read (stream_a, data.genesis_block.bytes); + } + catch (std::runtime_error const &) + { + error = true; + } + + return error; +} + +void nano::telemetry_ack::visit (nano::message_visitor & visitor_a) const +{ + visitor_a.telemetry_ack (*this); +} + +uint16_t nano::telemetry_ack::size (nano::message_header const & message_header_a) +{ + return static_cast (message_header_a.extensions.to_ulong ()); +} + +nano::telemetry_data nano::telemetry_data::consolidate (std::vector const & telemetry_data_responses_a) +{ + if (telemetry_data_responses_a.empty ()) + { + return {}; + } + else if (telemetry_data_responses_a.size () == 1) + { + // Only 1 element in the collection, so just return it. + return telemetry_data_responses_a.front (); + } + + nano::uint128_t account_sum{ 0 }; + nano::uint128_t block_sum{ 0 }; + nano::uint128_t cemented_sum{ 0 }; + nano::uint128_t peer_sum{ 0 }; + nano::uint128_t unchecked_sum{ 0 }; + nano::uint128_t uptime_sum{ 0 }; + nano::uint128_t bandwidth_sum{ 0 }; + + std::unordered_map protocol_versions; + std::unordered_map vendor_versions; + std::unordered_map bandwidth_caps; + std::unordered_map genesis_blocks; + + nano::uint128_t account_average{ 0 }; + + for (auto const & telemetry_data : telemetry_data_responses_a) + { + account_sum += telemetry_data.account_count; + block_sum += telemetry_data.block_count; + cemented_sum += telemetry_data.cemented_count; + ++vendor_versions[telemetry_data.vendor_version]; + ++protocol_versions[telemetry_data.protocol_version_number]; + peer_sum += telemetry_data.peer_count; + + // 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed + if (telemetry_data.bandwidth_cap != 0) + { + bandwidth_sum += telemetry_data.bandwidth_cap; + } + ++bandwidth_caps[telemetry_data.bandwidth_cap]; + unchecked_sum += telemetry_data.unchecked_count; + uptime_sum += telemetry_data.uptime; + ++genesis_blocks[telemetry_data.genesis_block]; + } + + nano::telemetry_data consolidated_data; + auto size = telemetry_data_responses_a.size (); + consolidated_data.account_count = boost::numeric_cast (account_sum / size); + consolidated_data.block_count = boost::numeric_cast (block_sum / size); + consolidated_data.cemented_count = boost::numeric_cast (cemented_sum / size); + consolidated_data.peer_count = boost::numeric_cast (peer_sum / size); + consolidated_data.uptime = boost::numeric_cast (uptime_sum / size); + consolidated_data.unchecked_count = boost::numeric_cast (unchecked_sum / size); + + auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) { + auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) { + return lhs.second < rhs.second; + }); + if (max->second > 1) + { + var = max->first; + } + else + { + var = (sum / size).template convert_to> (); + } + }; + + auto set_mode = [](auto const & collection, auto & var, size_t size) { + auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) { + return lhs.second < rhs.second; + }); + if (max->second > 1) + { + var = max->first; + } + else + { + // Just pick the first one + var = collection.begin ()->first; + } + }; + + // Use the mode of protocol version, vendor version and bandwidth cap if there is 2 or more using it + set_mode_or_average (bandwidth_caps, consolidated_data.bandwidth_cap, bandwidth_sum, size); + set_mode (protocol_versions, consolidated_data.protocol_version_number, size); + set_mode (vendor_versions, consolidated_data.vendor_version, size); + set_mode (genesis_blocks, consolidated_data.genesis_block, size); + + return consolidated_data; +} + +nano::error nano::telemetry_data::serialize_json (nano::jsonconfig & json) const +{ + json.put ("block_count", block_count); + json.put ("cemented_count", cemented_count); + json.put ("unchecked_count", unchecked_count); + json.put ("account_count", account_count); + json.put ("bandwidth_cap", bandwidth_cap); + json.put ("peer_count", peer_count); + json.put ("protocol_version_number", protocol_version_number); + json.put ("vendor_version", vendor_version); + json.put ("uptime", uptime); + json.put ("genesis_block", genesis_block.to_string ()); + return json.get_error (); +} + +nano::error nano::telemetry_data::deserialize_json (nano::jsonconfig & json) +{ + json.get ("block_count", block_count); + json.get ("cemented_count", cemented_count); + json.get ("unchecked_count", unchecked_count); + json.get ("account_count", account_count); + json.get ("bandwidth_cap", bandwidth_cap); + json.get ("peer_count", peer_count); + json.get ("protocol_version_number", protocol_version_number); + json.get ("vendor_version", vendor_version); + json.get ("uptime", uptime); + std::string genesis_block_l; + json.get ("genesis_block", genesis_block_l); + if (!json.get_error ()) + { + if (genesis_block.decode_hex (genesis_block_l)) + { + json.get_error ().set ("Could not deserialize genesis block"); + } + } + return json.get_error (); +} + +bool nano::telemetry_data::operator== (nano::telemetry_data const & data_a) const +{ + return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version_number == data_a.protocol_version_number && vendor_version == data_a.vendor_version && genesis_block == data_a.genesis_block); +} + nano::node_id_handshake::node_id_handshake (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a) : message (header_a), query (boost::none), diff --git a/nano/node/common.hpp b/nano/node/common.hpp index e3f4b48f..6021803f 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -169,8 +170,11 @@ enum class message_type : uint8_t frontier_req = 0x8, /* deleted 0x9 */ node_id_handshake = 0x0a, - bulk_pull_account = 0x0b + bulk_pull_account = 0x0b, + telemetry_req = 0x0c, + telemetry_ack = 0x0d }; + enum class bulk_pull_account_flags : uint8_t { pending_hash_and_amount = 0x0, @@ -206,8 +210,8 @@ public: /** Size of the payload in bytes. For some messages, the payload size is based on header flags. */ size_t payload_length_bytes () const; - static std::bitset<16> constexpr block_type_mask = std::bitset<16> (0x0f00); - static std::bitset<16> constexpr count_mask = std::bitset<16> (0xf000); + static std::bitset<16> constexpr block_type_mask{ 0x0f00 }; + static std::bitset<16> constexpr count_mask{ 0xf000 }; }; class message { @@ -237,6 +241,8 @@ public: invalid_confirm_req_message, invalid_confirm_ack_message, invalid_node_id_handshake_message, + invalid_telemetry_req_message, + invalid_telemetry_ack_message, outdated_version, invalid_magic, invalid_network @@ -248,6 +254,8 @@ public: void deserialize_confirm_req (nano::stream &, nano::message_header const &); void deserialize_confirm_ack (nano::stream &, nano::message_header const &); void deserialize_node_id_handshake (nano::stream &, nano::message_header const &); + void deserialize_telemetry_req (nano::stream &, nano::message_header const &); + void deserialize_telemetry_ack (nano::stream &, nano::message_header const &); bool at_end (nano::stream &); nano::block_uniquer & block_uniquer; nano::vote_uniquer & vote_uniquer; @@ -321,6 +329,49 @@ public: uint32_t count; static size_t constexpr size = sizeof (start) + sizeof (age) + sizeof (count); }; + +class telemetry_data +{ +public: + uint64_t block_count{ 0 }; + uint64_t cemented_count{ 0 }; + uint64_t unchecked_count{ 0 }; + uint64_t account_count{ 0 }; + uint64_t bandwidth_cap{ 0 }; + uint64_t uptime{ 0 }; + uint32_t peer_count{ 0 }; + uint8_t protocol_version_number{ 0 }; + uint8_t vendor_version{ 0 }; + nano::block_hash genesis_block{ 0 }; + + static nano::telemetry_data consolidate (std::vector const & telemetry_data_responses); + nano::error serialize_json (nano::jsonconfig & json) const; + nano::error deserialize_json (nano::jsonconfig & json); + bool operator== (nano::telemetry_data const &) const; + + static auto constexpr size = sizeof (block_count) + sizeof (cemented_count) + sizeof (unchecked_count) + sizeof (account_count) + sizeof (bandwidth_cap) + sizeof (peer_count) + sizeof (protocol_version_number) + sizeof (vendor_version) + sizeof (uptime) + sizeof (genesis_block); +}; +class telemetry_req final : public message +{ +public: + telemetry_req (); + explicit telemetry_req (nano::message_header const &); + void serialize (nano::stream &) const override; + bool deserialize (nano::stream &); + void visit (nano::message_visitor &) const override; +}; +class telemetry_ack final : public message +{ +public: + telemetry_ack (bool &, nano::stream &, nano::message_header const &); + explicit telemetry_ack (telemetry_data const &); + void serialize (nano::stream &) const override; + void visit (nano::message_visitor &) const override; + bool deserialize (nano::stream &); + static uint16_t size (nano::message_header const &); + nano::telemetry_data data; +}; + class bulk_pull final : public message { public: @@ -387,6 +438,8 @@ public: virtual void bulk_push (nano::bulk_push const &) = 0; virtual void frontier_req (nano::frontier_req const &) = 0; virtual void node_id_handshake (nano::node_id_handshake const &) = 0; + virtual void telemetry_req (nano::telemetry_req const &) = 0; + virtual void telemetry_ack (nano::telemetry_ack const &) = 0; virtual ~message_visitor (); }; diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 6963338b..4a0150d0 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -3894,6 +3895,130 @@ void nano::json_handler::stop () } } +void nano::json_handler::telemetry () +{ + auto rpc_l (shared_from_this ()); + + auto address_text (request.get_optional ("address")); + auto port_text (request.get_optional ("port")); + + if (address_text.is_initialized () || port_text.is_initialized ()) + { + // Check both are specified + std::shared_ptr channel; + if (address_text.is_initialized () && port_text.is_initialized ()) + { + uint16_t port; + if (!nano::parse_port (*port_text, port)) + { + boost::system::error_code address_ec; + auto address (boost::asio::ip::make_address_v6 (*address_text, address_ec)); + if (!address_ec) + { + nano::endpoint endpoint (address, port); + channel = node.network.find_channel (endpoint); + if (!channel) + { + ec = nano::error_rpc::peer_not_found; + } + } + else + { + ec = nano::error_common::invalid_ip_address; + } + } + else + { + ec = nano::error_common::invalid_port; + } + } + else + { + ec = nano::error_rpc::requires_port_and_address; + } + + if (!ec) + { + assert (channel); + node.telemetry.get_metrics_single_peer_async (channel, [rpc_l](auto const & single_telemetry_metric_a) { + if (!single_telemetry_metric_a.error) + { + nano::jsonconfig config_l; + auto err = single_telemetry_metric_a.data.serialize_json (config_l); + auto const & ptree = config_l.get_tree (); + + if (!err) + { + rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ()); + rpc_l->response_l.put ("cached", single_telemetry_metric_a.is_cached); + } + else + { + rpc_l->ec = nano::error_rpc::generic; + } + } + else + { + rpc_l->ec = nano::error_rpc::generic; + } + + rpc_l->response_errors (); + }); + } + else + { + response_errors (); + } + } + else + { + // By default, consolidated (average or mode) telemetry metrics are returned, + // setting "raw" to true returns metrics from all nodes requested. + auto raw = request.get_optional ("raw"); + auto output_raw = raw.value_or (false); + node.telemetry.get_metrics_random_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) { + if (output_raw) + { + boost::property_tree::ptree metrics; + for (auto & telemetry_metrics : batched_telemetry_metrics_a.data) + { + nano::jsonconfig config_l; + auto err = telemetry_metrics.serialize_json (config_l); + if (!err) + { + metrics.push_back (std::make_pair ("", config_l.get_tree ())); + } + else + { + rpc_l->ec = nano::error_rpc::generic; + } + } + + rpc_l->response_l.put_child ("metrics", metrics); + } + else + { + nano::jsonconfig config_l; + auto average_telemetry_metrics = nano::telemetry_data::consolidate (batched_telemetry_metrics_a.data); + auto err = average_telemetry_metrics.serialize_json (config_l); + auto const & ptree = config_l.get_tree (); + + if (!err) + { + rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ()); + } + else + { + rpc_l->ec = nano::error_rpc::generic; + } + } + + rpc_l->response_l.put ("cached", batched_telemetry_metrics_a.is_cached); + rpc_l->response_errors (); + }); + } +} + void nano::json_handler::unchecked () { const bool json_block_l = request.get ("json_block", false); @@ -5034,6 +5159,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map () no_arg_funcs.emplace ("stats", &nano::json_handler::stats); no_arg_funcs.emplace ("stats_clear", &nano::json_handler::stats_clear); no_arg_funcs.emplace ("stop", &nano::json_handler::stop); + no_arg_funcs.emplace ("node_telemetry", &nano::json_handler::telemetry); no_arg_funcs.emplace ("unchecked", &nano::json_handler::unchecked); no_arg_funcs.emplace ("unchecked_clear", &nano::json_handler::unchecked_clear); no_arg_funcs.emplace ("unchecked_get", &nano::json_handler::unchecked_get); diff --git a/nano/node/json_handler.hpp b/nano/node/json_handler.hpp index c9ee197c..ff8face8 100644 --- a/nano/node/json_handler.hpp +++ b/nano/node/json_handler.hpp @@ -97,6 +97,7 @@ public: void stats (); void stats_clear (); void stop (); + void telemetry (); void unchecked (); void unchecked_clear (); void unchecked_get (); diff --git a/nano/node/logging.cpp b/nano/node/logging.cpp index fb28b99a..17f756f4 100644 --- a/nano/node/logging.cpp +++ b/nano/node/logging.cpp @@ -94,6 +94,7 @@ nano::error nano::logging::serialize_toml (nano::tomlconfig & toml) const toml.put ("network_packet", network_packet_logging_value, "Log network packet activity.\ntype:bool"); toml.put ("network_keepalive", network_keepalive_logging_value, "Log keepalive related messages.\ntype:bool"); toml.put ("network_node_id_handshake", network_node_id_handshake_logging_value, "Log node-id handshake related messages.\ntype:bool"); + toml.put ("network_telemetry", network_telemetry_logging_value, "Log telemetry related messages.\ntype:bool"); toml.put ("node_lifetime_tracing", node_lifetime_tracing_value, "Log node startup and shutdown messages.\ntype:bool"); toml.put ("insufficient_work", insufficient_work_logging_value, "Log if insufficient work is detected.\ntype:bool"); toml.put ("log_ipc", log_ipc_value, "Log IPC related activity.\ntype:bool"); @@ -124,6 +125,7 @@ nano::error nano::logging::deserialize_toml (nano::tomlconfig & toml) toml.get ("network_packet", network_packet_logging_value); toml.get ("network_keepalive", network_keepalive_logging_value); toml.get ("network_node_id_handshake", network_node_id_handshake_logging_value); + toml.get ("network_telemetry_logging", network_telemetry_logging_value); toml.get ("node_lifetime_tracing", node_lifetime_tracing_value); toml.get ("insufficient_work", insufficient_work_logging_value); toml.get ("log_ipc", log_ipc_value); @@ -157,6 +159,7 @@ nano::error nano::logging::serialize_json (nano::jsonconfig & json) const json.put ("network_packet", network_packet_logging_value); json.put ("network_keepalive", network_keepalive_logging_value); json.put ("network_node_id_handshake", network_node_id_handshake_logging_value); + json.put ("network_telemetry_logging", network_telemetry_logging_value); json.put ("node_lifetime_tracing", node_lifetime_tracing_value); json.put ("insufficient_work", insufficient_work_logging_value); json.put ("log_ipc", log_ipc_value); @@ -309,6 +312,11 @@ bool nano::logging::network_node_id_handshake_logging () const return network_logging () && network_node_id_handshake_logging_value; } +bool nano::logging::network_telemetry_logging () const +{ + return network_logging () && network_telemetry_logging_value; +} + bool nano::logging::node_lifetime_tracing () const { return node_lifetime_tracing_value; diff --git a/nano/node/logging.hpp b/nano/node/logging.hpp index 2da20c4f..61dcec5e 100644 --- a/nano/node/logging.hpp +++ b/nano/node/logging.hpp @@ -52,6 +52,7 @@ public: bool network_packet_logging () const; bool network_keepalive_logging () const; bool network_node_id_handshake_logging () const; + bool network_telemetry_logging () const; bool node_lifetime_tracing () const; bool insufficient_work_logging () const; bool upnp_details_logging () const; @@ -75,6 +76,7 @@ public: bool network_packet_logging_value{ false }; bool network_keepalive_logging_value{ false }; bool network_node_id_handshake_logging_value{ false }; + bool network_telemetry_logging_value{ false }; bool node_lifetime_tracing_value{ false }; bool insufficient_work_logging_value{ true }; bool log_ipc_value{ true }; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 18150a9d..4b82b31b 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -506,6 +507,42 @@ public: { node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); } + void telemetry_req (nano::telemetry_req const & message_a) override + { + if (node.config.logging.network_telemetry_logging ()) + { + node.logger.try_log (boost::str (boost::format ("Telemetry_req message from %1%") % channel->to_string ())); + } + node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in); + + nano::telemetry_data telemetry_data; + telemetry_data.block_count = node.ledger.cache.block_count; + telemetry_data.cemented_count = node.ledger.cache.cemented_count; + telemetry_data.bandwidth_cap = node.config.bandwidth_limit; + telemetry_data.protocol_version_number = node.network_params.protocol.protocol_version; + telemetry_data.vendor_version = nano::get_major_node_version (); + telemetry_data.uptime = std::chrono::duration_cast (std::chrono::steady_clock::now () - node.startup_time).count (); + telemetry_data.unchecked_count = node.ledger.cache.unchecked_count; + telemetry_data.genesis_block = nano::genesis ().hash (); + telemetry_data.peer_count = node.network.size (); + + { + auto transaction = node.store.tx_begin_read (); + telemetry_data.account_count = node.store.account_count (transaction); + } + + nano::telemetry_ack telemetry_ack (telemetry_data); + channel->send (telemetry_ack); + } + void telemetry_ack (nano::telemetry_ack const & message_a) override + { + if (node.config.logging.network_telemetry_logging ()) + { + node.logger.try_log (boost::str (boost::format ("Received telemetry_ack message from %1%") % channel->to_string ())); + } + node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in); + node.telemetry.add (message_a.data, channel->get_endpoint ()); + } nano::node & node; std::shared_ptr channel; }; @@ -590,10 +627,10 @@ std::deque> nano::network::list_fanout return result; } -std::unordered_set> nano::network::random_set (size_t count_a) const +std::unordered_set> nano::network::random_set (size_t count_a, uint8_t min_version_a) const { - std::unordered_set> result (tcp_channels.random_set (count_a)); - std::unordered_set> udp_random (udp_channels.random_set (count_a)); + std::unordered_set> result (tcp_channels.random_set (count_a, min_version_a)); + std::unordered_set> udp_random (udp_channels.random_set (count_a, min_version_a)); for (auto i (udp_random.begin ()), n (udp_random.end ()); i != n && result.size () < count_a * 1.5; ++i) { result.insert (*i); diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 0bfd0ed6..b6729a37 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -139,7 +139,7 @@ public: // A list of random peers sized for the configured rebroadcast fanout std::deque> list_fanout (); void random_fill (std::array &) const; - std::unordered_set> random_set (size_t) const; + std::unordered_set> random_set (size_t, uint8_t = 0) const; // Get the next peer for attempting a tcp bootstrap connection nano::tcp_endpoint bootstrap_peer (bool = false); nano::endpoint endpoint (); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index a9a44ea7..8419811f 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -125,6 +126,7 @@ gap_cache (*this), ledger (store, stats, flags_a.generate_cache), checker (config.signature_checker_threads), network (*this, config.peering_port), +telemetry (network, alarm, worker), bootstrap_initiator (*this), bootstrap (config.peering_port, *this), application_path (application_path_a), @@ -133,10 +135,12 @@ vote_processor (checker, active, store, observers, stats, config, logger, online rep_crawler (*this), warmed_up (0), block_processor (*this, write_database_queue), +// clang-format off block_processor_thread ([this]() { nano::thread_role::set (nano::thread_role::name::block_processing); this->block_processor.process_blocks (); }), +// clang-format on online_reps (ledger, network_params, config.online_weight_minimum.number ()), votes_cache (wallets), vote_uniquer (block_uniquer), @@ -579,6 +583,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator")); composite->add_component (collect_container_info (node.bootstrap, "bootstrap")); composite->add_component (collect_container_info (node.network, "network")); + composite->add_component (collect_container_info (node.telemetry, "telemetry")); composite->add_component (collect_container_info (node.observers, "observers")); composite->add_component (collect_container_info (node.wallets, "wallets")); composite->add_component (collect_container_info (node.vote_processor, "vote_processor")); @@ -691,6 +696,7 @@ void nano::node::stop () confirmation_height_processor.stop (); active.stop (); network.stop (); + telemetry.stop (); if (websocket_server) { websocket_server->stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index b8985a5d..c107dd5e 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -47,7 +48,7 @@ namespace websocket } class node; - +class telemetry; class work_pool; class block_arrival_info final { @@ -166,6 +167,7 @@ public: nano::ledger ledger; nano::signature_checker checker; nano::network network; + nano::telemetry telemetry; nano::bootstrap_initiator bootstrap_initiator; nano::bootstrap_listener bootstrap; boost::filesystem::path application_path; diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp new file mode 100644 index 00000000..f0875fd2 --- /dev/null +++ b/nano/node/telemetry.cpp @@ -0,0 +1,377 @@ +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +std::chrono::milliseconds constexpr nano::telemetry_impl::cache_cutoff; + +nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) : +network (network_a), +alarm (alarm_a), +worker (worker_a), +batch_request (std::make_shared (network, alarm, worker)) +{ +} + +void nano::telemetry::stop () +{ + nano::lock_guard guard (mutex); + batch_request = nullptr; + single_requests.clear (); + stopped = true; +} + +void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a) +{ + nano::lock_guard guard (mutex); + if (!stopped) + { + batch_request->add (telemetry_data_a, endpoint_a); + + for (auto & request : single_requests) + { + request.second.impl->add (telemetry_data_a, endpoint_a); + } + } +} + +void nano::telemetry::get_metrics_random_peers_async (std::function const & callback_a) +{ + // These peers will only be used if there isn't an already ongoing batch telemetry request round + auto random_peers = network.random_set (network.size_sqrt (), network_params.protocol.telemetry_protocol_version_min); + nano::lock_guard guard (mutex); + if (!stopped && !random_peers.empty ()) + { + batch_request->get_metrics_async (random_peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) { + callback_a (telemetry_data_responses); + }); + } + else + { + const auto all_received = false; + callback_a (nano::telemetry_data_responses{ {}, false, all_received }); + } +} + +nano::telemetry_data_responses nano::telemetry::get_metrics_random_peers () +{ + std::promise promise; + get_metrics_random_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) { + promise.set_value (telemetry_data_responses_a); + }); + + return promise.get_future ().get (); +} + +// After a request is made to a single peer we want to remove it from the container after the peer has not been requested for a while (cache_cutoff). +void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a) +{ + // This class is just + class ongoing_func_wrapper + { + public: + std::function ongoing_func; + }; + + auto wrapper = std::make_shared (); + // Keep calling ongoing_func while the peer is still being called + const auto & last_updated = single_request_data_a.last_updated; + wrapper->ongoing_func = [this, telemetry_impl_w = std::weak_ptr (single_request_data_a.impl), &last_updated, &endpoint_a, wrapper]() { + if (auto telemetry_impl = telemetry_impl_w.lock ()) + { + nano::lock_guard guard (this->mutex); + if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > last_updated && telemetry_impl->callbacks.empty ()) + { + this->single_requests.erase (endpoint_a); + } + else + { + // Request is still active, so call again + this->alarm.add (std::chrono::steady_clock::now () + telemetry_impl->cache_cutoff, wrapper->ongoing_func); + } + } + }; + + alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, wrapper->ongoing_func); +} + +void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a) +{ + auto telemetry_impl = single_request_data_a.impl; + if (is_new_a) + { + // Clean this request up when it isn't being used anymore + ongoing_single_request_cleanup (endpoint_a, single_request_data_a); + } + else + { + // Ensure that refreshed flag is reset so we don't delete it before processing + single_request_data_a.last_updated = std::chrono::steady_clock::now (); + } +} + +void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr const & channel_a, std::function const & callback_a) +{ + auto invoke_callback_with_error = [&callback_a]() { + auto const is_cached = false; + auto const error = true; + callback_a ({ nano::telemetry_data (), is_cached, error }); + }; + + nano::lock_guard guard (mutex); + if (!stopped) + { + if (channel_a && (channel_a->get_network_version () >= network_params.protocol.telemetry_protocol_version_min)) + { + auto pair = single_requests.emplace (channel_a->get_endpoint (), single_request_data{ std::make_shared (network, alarm, worker), std::chrono::steady_clock::now () }); + update_cleanup_data (pair.first->first, pair.first->second, pair.second); + + pair.first->second.impl->get_metrics_async ({ channel_a }, [callback_a](telemetry_data_responses const & telemetry_data_responses_a) { + // There should only be 1 response, so if this hasn't been received then conclude it is an error. + auto const error = !telemetry_data_responses_a.all_received; + if (!error) + { + assert (telemetry_data_responses_a.data.size () == 1); + callback_a ({ telemetry_data_responses_a.data.front (), telemetry_data_responses_a.is_cached, error }); + } + else + { + callback_a ({ nano::telemetry_data (), telemetry_data_responses_a.is_cached, error }); + } + }); + } + else + { + invoke_callback_with_error (); + } + } + else + { + invoke_callback_with_error (); + } +} + +nano::telemetry_data_response nano::telemetry::get_metrics_single_peer (std::shared_ptr const & channel_a) +{ + std::promise promise; + get_metrics_single_peer_async (channel_a, [&promise](telemetry_data_response const & single_metric_data_a) { + promise.set_value (single_metric_data_a); + }); + + return promise.get_future ().get (); +} + +size_t nano::telemetry::telemetry_data_size () +{ + nano::lock_guard guard (mutex); + auto total = std::accumulate (single_requests.begin (), single_requests.end (), static_cast (0), [](size_t total, auto & single_request) { + return total += single_request.second.impl->telemetry_data_size (); + }); + + if (batch_request) + { + total += batch_request->telemetry_data_size (); + } + return total; +} + +nano::telemetry_impl::telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) : +network (network_a), +alarm (alarm_a), +worker (worker_a) +{ +} + +void nano::telemetry_impl::flush_callbacks (nano::unique_lock & lk_a, bool cached_a) +{ + // Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added, + // so check again and invoke those too + assert (lk_a.owns_lock ()); + invoking = true; + while (!callbacks.empty ()) + { + lk_a.unlock (); + invoke_callbacks (cached_a); + lk_a.lock (); + } + invoking = false; +} + +void nano::telemetry_impl::get_metrics_async (std::unordered_set> const & channels_a, std::function const & callback_a) +{ + { + assert (!channels_a.empty ()); + nano::unique_lock lk (mutex); + callbacks.push_back (callback_a); + if (callbacks.size () > 1 || invoking) + { + // This means we already have at least one pending result already, so it will handle calls this callback when it completes + return; + } + + // Check if we can just return cached results + if (std::chrono::steady_clock::now () < (last_time + cache_cutoff)) + { + // Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise) + worker.push_task ([this_w = std::weak_ptr (shared_from_this ())]() { + if (auto this_l = this_w.lock ()) + { + nano::unique_lock lk (this_l->mutex); + const auto is_cached = true; + this_l->flush_callbacks (lk, is_cached); + } + }); + return; + } + + all_received = true; + assert (required_responses.empty ()); + std::transform (channels_a.begin (), channels_a.end (), std::inserter (required_responses, required_responses.end ()), [](auto const & channel) { + return channel->get_endpoint (); + }); + } + + fire_request_messages (channels_a); +} + +void nano::telemetry_impl::add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a) +{ + nano::unique_lock lk (mutex); + if (required_responses.find (endpoint_a) == required_responses.cend ()) + { + // Not requesting telemetry data from this channel so ignore it + return; + } + + current_telemetry_data_responses.push_back (telemetry_data_a); + channel_processed (lk, endpoint_a); +} + +void nano::telemetry_impl::invoke_callbacks (bool cached_a) +{ + decltype (callbacks) callbacks_l; + + { + // Copy callbacks so that they can be called outside of holding the lock + nano::lock_guard guard (mutex); + callbacks_l = callbacks; + current_telemetry_data_responses.clear (); + callbacks.clear (); + } + for (auto & callback : callbacks_l) + { + callback ({ cached_telemetry_data, cached_a, all_received }); + } +} + +void nano::telemetry_impl::channel_processed (nano::unique_lock & lk_a, nano::endpoint const & endpoint_a) +{ + assert (lk_a.owns_lock ()); + auto num_removed = required_responses.erase (endpoint_a); + if (num_removed > 0 && required_responses.empty ()) + { + assert (lk_a.owns_lock ()); + cached_telemetry_data = current_telemetry_data_responses; + + last_time = std::chrono::steady_clock::now (); + auto const is_cached = false; + flush_callbacks (lk_a, is_cached); + } +} + +void nano::telemetry_impl::fire_request_messages (std::unordered_set> const & channels) +{ + uint64_t round_l; + { + nano::lock_guard guard (mutex); + ++round; + round_l = round; + } + + // Fire off a telemetry request to all passed in channels + nano::telemetry_req message; + for (auto & channel : channels) + { + assert (channel->get_network_version () >= network_params.protocol.telemetry_protocol_version_min); + + std::weak_ptr this_w (shared_from_this ()); + channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) { + if (auto this_l = this_w.lock ()) + { + if (ec) + { + // Error sending the telemetry_req message + nano::unique_lock lk (this_l->mutex); + this_l->all_received = false; + this_l->channel_processed (lk, endpoint); + } + } + }); + + // If no response is seen after a certain period of time, remove it from the list of expected responses. However, only if it is part of the same round. + alarm.add (std::chrono::steady_clock::now () + cache_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() { + if (auto this_l = this_w.lock ()) + { + nano::unique_lock lk (this_l->mutex); + if (this_l->round == round_l && this_l->required_responses.find (endpoint) != this_l->required_responses.cend ()) + { + this_l->all_received = false; + this_l->channel_processed (lk, endpoint); + } + } + }); + } +} + +size_t nano::telemetry_impl::telemetry_data_size () +{ + nano::lock_guard guard (mutex); + return current_telemetry_data_responses.size (); +} + +std::unique_ptr nano::collect_container_info (telemetry & telemetry, const std::string & name) +{ + size_t single_requests_count; + { + nano::lock_guard guard (telemetry.mutex); + single_requests_count = telemetry.single_requests.size (); + } + + auto composite = std::make_unique (name); + if (telemetry.batch_request) + { + composite->add_component (collect_container_info (*telemetry.batch_request, "batch_request")); + } + composite->add_component (std::make_unique (container_info{ "single_requests", single_requests_count, sizeof (decltype (telemetry.single_requests)::value_type) })); + return composite; +} + +std::unique_ptr nano::collect_container_info (telemetry_impl & telemetry_impl, const std::string & name) +{ + size_t callback_count; + size_t all_telemetry_data_count; + size_t cached_telemetry_data_count; + size_t required_responses_count; + { + nano::lock_guard guard (telemetry_impl.mutex); + callback_count = telemetry_impl.callbacks.size (); + all_telemetry_data_count = telemetry_impl.current_telemetry_data_responses.size (); + cached_telemetry_data_count = telemetry_impl.cached_telemetry_data.size (); + required_responses_count = telemetry_impl.required_responses.size (); + } + + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "callbacks", callback_count, sizeof (decltype (telemetry_impl.callbacks)::value_type) })); + composite->add_component (std::make_unique (container_info{ "current_telemetry_data_responses", all_telemetry_data_count, sizeof (decltype (telemetry_impl.current_telemetry_data_responses)::value_type) })); + composite->add_component (std::make_unique (container_info{ "cached_telemetry_data", cached_telemetry_data_count, sizeof (decltype (telemetry_impl.cached_telemetry_data)::value_type) })); + composite->add_component (std::make_unique (container_info{ "required_responses", required_responses_count, sizeof (decltype (telemetry_impl.required_responses)::value_type) })); + return composite; +} diff --git a/nano/node/telemetry.hpp b/nano/node/telemetry.hpp new file mode 100644 index 00000000..72d4b1bd --- /dev/null +++ b/nano/node/telemetry.hpp @@ -0,0 +1,165 @@ +#pragma once + +#include +#include + +#include +#include +#include + +namespace nano +{ +class network; +class alarm; +class worker; +class telemetry; +namespace transport +{ + class channel; +} + +/* + * Holds a response from a telemetry request + */ +class telemetry_data_response +{ +public: + nano::telemetry_data data; + bool is_cached; + bool error; +}; + +/* + * Holds many responses from telemetry requests + */ +class telemetry_data_responses +{ +public: + std::vector data; + bool is_cached; + bool all_received; +}; + +/* + * This class requests node telemetry metrics and invokes any callbacks + * which have been aggregated. Further calls to get_metrics_async may return cached telemetry metrics + * if they are within cache_cutoff time from the latest request. + */ +class telemetry_impl : public std::enable_shared_from_this +{ +public: + telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a); + +private: + // Class only available to the telemetry class + void get_metrics_async (std::unordered_set> const & channels_a, std::function const & callback_a); + void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a); + size_t telemetry_data_size (); + + nano::network_params network_params; + // Anything older than this requires requesting metrics from other nodes + static std::chrono::milliseconds constexpr cache_cutoff{ 3000 }; + + // All data in this chunk is protected by this mutex + std::mutex mutex; + std::vector> callbacks; + std::chrono::steady_clock::time_point last_time = std::chrono::steady_clock::now () - cache_cutoff; + /* The responses received during this request round */ + std::vector current_telemetry_data_responses; + /* The metrics for the last request round */ + std::vector cached_telemetry_data; + std::unordered_set required_responses; + uint64_t round{ 0 }; + /* Currently executing callbacks */ + bool invoking{ false }; + + std::atomic all_received{ true }; + + nano::network & network; + nano::alarm & alarm; + nano::worker & worker; + + void invoke_callbacks (bool cached_a); + void channel_processed (nano::unique_lock & lk_a, nano::endpoint const & endpoint_a); + void flush_callbacks (nano::unique_lock & lk_a, bool cached_a); + void fire_request_messages (std::unordered_set> const & channels); + + friend std::unique_ptr collect_container_info (telemetry_impl &, const std::string &); + friend nano::telemetry; + friend class node_telemetry_single_request_Test; + friend class node_telemetry_basic_Test; +}; + +std::unique_ptr collect_container_info (telemetry_impl & telemetry_impl, const std::string & name); + +class telemetry +{ +public: + telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a); + + /* + * Add telemetry metrics received from this endpoint. + * Should this be unsolicited, it will not be added. + */ + void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a); + + /* + * Collects metrics from square root number of peers and invokes the callback when complete. + */ + void get_metrics_random_peers_async (std::function const & callback_a); + + /* + * A blocking version of get_metrics_random_peers_async (). + */ + telemetry_data_responses get_metrics_random_peers (); + + /* + * This makes a telemetry request to the specific channel + */ + void get_metrics_single_peer_async (std::shared_ptr const &, std::function const & callback_a); + + /* + * A blocking version of get_metrics_single_peer_async + */ + telemetry_data_response get_metrics_single_peer (std::shared_ptr const &); + + /* + * Return the number of node metrics collected + */ + size_t telemetry_data_size (); + + /* + * Stop the telemetry processor + */ + void stop (); + +private: + nano::network & network; + nano::alarm & alarm; + nano::worker & worker; + + nano::network_params network_params; + + class single_request_data + { + public: + std::shared_ptr impl; + std::chrono::steady_clock::time_point last_updated{ std::chrono::steady_clock::now () }; + }; + + std::mutex mutex; + /* Requests telemetry data from a random selection of peers */ + std::shared_ptr batch_request; + /* Any requests to specific individual peers is maintained here */ + std::unordered_map single_requests; + bool stopped{ false }; + + void update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a); + void ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a); + + friend class node_telemetry_multiple_single_request_clearing_Test; + friend std::unique_ptr collect_container_info (telemetry &, const std::string &); +}; + +std::unique_ptr collect_container_info (telemetry & telemetry, const std::string & name); +} \ No newline at end of file diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 2be9588f..27e51a00 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -152,7 +152,7 @@ std::shared_ptr nano::transport::tcp_channels::fin return result; } -std::unordered_set> nano::transport::tcp_channels::random_set (size_t count_a) const +std::unordered_set> nano::transport::tcp_channels::random_set (size_t count_a, uint8_t min_version) const { std::unordered_set> result; result.reserve (count_a); @@ -167,7 +167,12 @@ std::unordered_set> nano::transport::t for (auto i (0); i < random_cutoff && result.size () < count_a; ++i) { auto index (nano::random_pool::generate_word32 (0, static_cast (peers_size - 1))); - result.insert (channels.get ()[index].channel); + + auto channel = channels.get ()[index].channel; + if (channel->get_network_version () >= min_version && !channel->server) + { + result.insert (channel); + } } } return result; diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index a6efe8ce..50cbf798 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -84,7 +84,7 @@ namespace transport size_t size () const; std::shared_ptr find_channel (nano::tcp_endpoint const &) const; void random_fill (std::array &) const; - std::unordered_set> random_set (size_t) const; + std::unordered_set> random_set (size_t, uint8_t = 0) const; bool store_all (bool = true); std::shared_ptr find_node_id (nano::account const &); // Get the next peer for attempting a tcp connection diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 37f1731a..490a7d6b 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -47,6 +47,14 @@ public: { result = nano::stat::detail::node_id_handshake; } + void telemetry_req (nano::telemetry_req const & message_a) override + { + result = nano::stat::detail::telemetry_req; + } + void telemetry_ack (nano::telemetry_ack const & message_a) override + { + result = nano::stat::detail::telemetry_ack; + } nano::stat::detail result; }; } diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 801dc484..20b382b7 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -147,7 +147,7 @@ std::shared_ptr nano::transport::udp_channels::cha return result; } -std::unordered_set> nano::transport::udp_channels::random_set (size_t count_a) const +std::unordered_set> nano::transport::udp_channels::random_set (size_t count_a, uint8_t min_version) const { std::unordered_set> result; result.reserve (count_a); @@ -162,7 +162,11 @@ std::unordered_set> nano::transport::u for (auto i (0); i < random_cutoff && result.size () < count_a; ++i) { auto index (nano::random_pool::generate_word32 (0, static_cast (peers_size - 1))); - result.insert (channels.get ()[index].channel); + auto channel = channels.get ()[index].channel; + if (channel->get_network_version () >= min_version) + { + result.insert (channel); + } } } return result; @@ -428,6 +432,14 @@ public: { assert (false); } + void telemetry_req (nano::telemetry_req const & message_a) override + { + message (message_a); + } + void telemetry_ack (nano::telemetry_ack const & message_a) override + { + message (message_a); + } void node_id_handshake (nano::node_id_handshake const & message_a) override { if (node.config.logging.network_node_id_handshake_logging ()) @@ -552,6 +564,12 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_ case nano::message_parser::parse_status::invalid_node_id_handshake_message: node.stats.inc (nano::stat::type::udp, nano::stat::detail::invalid_node_id_handshake_message); break; + case nano::message_parser::parse_status::invalid_telemetry_req_message: + node.stats.inc (nano::stat::type::udp, nano::stat::detail::invalid_telemetry_req_message); + break; + case nano::message_parser::parse_status::invalid_telemetry_ack_message: + node.stats.inc (nano::stat::type::udp, nano::stat::detail::invalid_telemetry_ack_message); + break; case nano::message_parser::parse_status::outdated_version: node.stats.inc (nano::stat::type::udp, nano::stat::detail::outdated_version); break; diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 67a31539..9945546d 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -69,7 +69,7 @@ namespace transport size_t size () const; std::shared_ptr channel (nano::endpoint const &) const; void random_fill (std::array &) const; - std::unordered_set> random_set (size_t) const; + std::unordered_set> random_set (size_t, uint8_t = 0) const; bool store_all (bool = true); std::shared_ptr find_node_id (nano::account const &); void clean_node_id (nano::account const &); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 5a31b3ac..c5e2f336 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -7831,3 +7831,201 @@ TEST (rpc, receive_work_disabled) ASSERT_EQ (std::error_code (nano::error_common::disabled_work_generation).message (), response.json.get ("error")); } } + +namespace +{ +void compare_default_test_result_data (test_response & response, nano::node const & node_server_a) +{ + ASSERT_EQ (200, response.status); + ASSERT_FALSE (response.json.get ("cached")); + ASSERT_EQ (1, response.json.get ("block_count")); + ASSERT_EQ (1, response.json.get ("cemented_count")); + ASSERT_EQ (0, response.json.get ("unchecked_count")); + ASSERT_EQ (1, response.json.get ("account_count")); + ASSERT_EQ (node_server_a.config.bandwidth_limit, response.json.get ("bandwidth_cap")); + ASSERT_EQ (1, response.json.get ("peer_count")); + ASSERT_EQ (node_server_a.network_params.protocol.protocol_version, response.json.get ("protocol_version_number")); + ASSERT_EQ (nano::get_major_node_version (), response.json.get ("vendor_version")); + ASSERT_GE (100, response.json.get ("uptime")); + ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get ("genesis_block")); +} +} + +TEST (rpc, node_telemetry_single) +{ + nano::system system (1); + auto & node1 = *add_ipc_enabled_node (system); + scoped_io_thread_name_change scoped_thread_name_io; + nano::node_rpc_config node_rpc_config; + nano::ipc::ipc_server ipc_server (node1, node_rpc_config); + nano::rpc_config rpc_config (nano::get_available_port (), true); + rpc_config.rpc_process.ipc_port = node1.config.ipc_config.transport_tcp.port; + nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config); + nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor); + rpc.start (); + + // Wait until peers are stored as they are done in the background + auto peers_stored = false; + while (!peers_stored) + { + ASSERT_NO_ERROR (system.poll ()); + + auto transaction = system.nodes.back ()->store.tx_begin_read (); + peers_stored = system.nodes.back ()->store.peer_count (transaction) != 0; + } + + // Missing port + boost::property_tree::ptree request; + auto node = system.nodes.front (); + request.put ("action", "node_telemetry"); + request.put ("address", "not_a_valid_address"); + + { + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (10s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + ASSERT_EQ (std::error_code (nano::error_rpc::requires_port_and_address).message (), response.json.get ("error")); + } + + // Missing address + request.erase ("address"); + request.put ("port", 65); + + { + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (10s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + ASSERT_EQ (std::error_code (nano::error_rpc::requires_port_and_address).message (), response.json.get ("error")); + } + + // Try with invalid address + request.put ("address", "not_a_valid_address"); + request.put ("port", 65); + + { + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (10s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + ASSERT_EQ (std::error_code (nano::error_common::invalid_ip_address).message (), response.json.get ("error")); + } + + // Then invalid port + request.put ("address", (boost::format ("%1%") % node->network.endpoint ().address ()).str ()); + request.put ("port", "invalid port"); + { + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (10s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + ASSERT_EQ (std::error_code (nano::error_common::invalid_port).message (), response.json.get ("error")); + } + + // Use correctly formed address and port + request.put ("port", node->network.endpoint ().port ()); + { + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (10s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + compare_default_test_result_data (response, *node); + } +} + +TEST (rpc, node_telemetry_random) +{ + nano::system system (1); + auto & node1 = *add_ipc_enabled_node (system); + scoped_io_thread_name_change scoped_thread_name_io; + nano::node_rpc_config node_rpc_config; + nano::ipc::ipc_server ipc_server (node1, node_rpc_config); + nano::rpc_config rpc_config (nano::get_available_port (), true); + rpc_config.rpc_process.ipc_port = node1.config.ipc_config.transport_tcp.port; + nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config); + nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor); + rpc.start (); + + // Wait until peers are stored as they are done in the background + auto peers_stored = false; + while (!peers_stored) + { + ASSERT_NO_ERROR (system.poll ()); + + auto transaction = system.nodes.back ()->store.tx_begin_read (); + peers_stored = system.nodes.back ()->store.peer_count (transaction) != 0; + } + + boost::property_tree::ptree request; + auto node = system.nodes.front (); + request.put ("action", "node_telemetry"); + { + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (10s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + ASSERT_FALSE (response.json.get ("cached")); + ASSERT_EQ (1, response.json.get ("block_count")); + ASSERT_EQ (1, response.json.get ("cemented_count")); + ASSERT_EQ (0, response.json.get ("unchecked_count")); + ASSERT_EQ (1, response.json.get ("account_count")); + ASSERT_EQ (node->config.bandwidth_limit, response.json.get ("bandwidth_cap")); + ASSERT_EQ (1, response.json.get ("peer_count")); + ASSERT_EQ (node->network_params.protocol.protocol_version, response.json.get ("protocol_version_number")); + ASSERT_EQ (nano::get_major_node_version (), response.json.get ("vendor_version")); + ASSERT_GE (100, response.json.get ("uptime")); + ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get ("genesis_block")); + } + + request.put ("raw", "true"); + test_response response (request, rpc.config.port, system.io_ctx); + system.deadline_set (10s); + while (response.status == 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (200, response.status); + + // This may fail if the response has taken longer than the cache cutoff time. + ASSERT_TRUE (response.json.get ("cached")); + + auto & all_metrics = response.json.get_child ("metrics"); + std::vector> raw_metrics_json_l; + for (auto & metrics_pair : all_metrics) + { + auto & metrics = metrics_pair.second; + raw_metrics_json_l.emplace_back (metrics.get ("block_count"), metrics.get ("cemented_count"), metrics.get ("unchecked_count"), metrics.get ("account_count"), metrics.get ("bandwidth_cap"), metrics.get ("peer_count"), metrics.get ("protocol_version_number"), metrics.get ("vendor_version"), metrics.get ("uptime"), metrics.get ("genesis_block")); + } + + ASSERT_EQ (1, raw_metrics_json_l.size ()); + auto const & metrics = raw_metrics_json_l.front (); + ASSERT_EQ (1, std::get<0> (metrics)); + ASSERT_EQ (1, std::get<1> (metrics)); + ASSERT_EQ (0, std::get<2> (metrics)); + ASSERT_EQ (1, std::get<3> (metrics)); + ASSERT_EQ (node->config.bandwidth_limit, std::get<4> (metrics)); + ASSERT_EQ (1, std::get<5> (metrics)); + ASSERT_EQ (node->network_params.protocol.protocol_version, std::get<6> (metrics)); + ASSERT_EQ (nano::get_major_node_version (), std::get<7> (metrics)); + ASSERT_GE (100, std::get<8> (metrics)); + ASSERT_EQ (nano::genesis ().hash ().to_string (), std::get<9> (metrics)); +} diff --git a/nano/secure/common.cpp b/nano/secure/common.cpp index 6e5d70c8..9adb2921 100644 --- a/nano/secure/common.cpp +++ b/nano/secure/common.cpp @@ -161,7 +161,6 @@ nano::keypair const & nano::test_genesis_key (test_constants.test_genesis_key); nano::account const & nano::nano_test_account (test_constants.nano_test_account); std::string const & nano::nano_test_genesis (test_constants.nano_test_genesis); nano::account const & nano::genesis_account (test_constants.genesis_account); -std::string const & nano::genesis_block (test_constants.genesis_block); nano::uint128_t const & nano::genesis_amount (test_constants.genesis_amount); nano::account const & nano::burn_account (test_constants.burn_account); diff --git a/nano/secure/common.hpp b/nano/secure/common.hpp index 3cf07ac1..6fe99d69 100644 --- a/nano/secure/common.hpp +++ b/nano/secure/common.hpp @@ -347,7 +347,7 @@ public: protocol_constants (nano::nano_networks network_a); /** Current protocol version */ - uint8_t protocol_version = 0x11; + uint8_t protocol_version = 0x12; /** Minimum accepted protocol version */ uint8_t protocol_version_min = 0x10; @@ -360,6 +360,9 @@ public: /** Do not start TCP realtime network connections to nodes older than this version */ uint8_t tcp_realtime_protocol_version_min = 0x11; + + /** Do not request telemetry metrics to nodes older than this version */ + uint8_t telemetry_protocol_version_min = 0x12; }; /** Genesis keys and ledger constants for network variants */