From baabcca4260566d02815106f4d625f4771cca3df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20W=C3=B3jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 2 Feb 2023 16:14:26 +0100 Subject: [PATCH] Telemetry refactor (#4026) * Simplify telemetry * Fix tests * Cleanup config * Cleanup local telemetry * Remove unused flag * Fix slow tests * Fix rpc tests * Cleanup `nano::test::compare_telemetry` * Add more testcases * Add ongoing telemetry broadcasts * Cleanup * Fixes * Do not immediately remove telemetry from disconnected peers * Increase telemetry broadcast & request intervals * Update docs * Refactor `peer_exclusion` a bit * Filter and disconnect from peers with mismatched genesis --------- Co-authored-by: clemahieu --- nano/core_test/network.cpp | 65 ++- nano/core_test/telemetry.cpp | 340 +++++----------- nano/core_test/websocket.cpp | 11 +- nano/lib/config.hpp | 15 + nano/lib/stats_enums.hpp | 9 + nano/lib/threading.cpp | 3 + nano/lib/threading.hpp | 1 + nano/lib/utility.hpp | 20 + nano/node/common.cpp | 20 +- nano/node/common.hpp | 15 +- nano/node/json_handler.cpp | 141 +++---- nano/node/network.cpp | 26 +- nano/node/network.hpp | 3 + nano/node/node.cpp | 53 ++- nano/node/node.hpp | 6 +- nano/node/node_observers.hpp | 2 +- nano/node/nodeconfig.hpp | 5 +- nano/node/peer_exclusion.cpp | 101 ++--- nano/node/peer_exclusion.hpp | 26 +- nano/node/telemetry.cpp | 632 ++++++++++------------------- nano/node/telemetry.hpp | 176 ++++---- nano/node/transport/tcp.cpp | 7 - nano/node/transport/tcp_server.cpp | 5 +- nano/node/transport/tcp_server.hpp | 2 +- nano/node/transport/udp.cpp | 2 +- nano/node/websocket.cpp | 4 +- nano/rpc_test/rpc.cpp | 21 +- nano/slow_test/node.cpp | 79 ++-- nano/test_common/telemetry.cpp | 80 ++-- nano/test_common/telemetry.hpp | 22 +- nano/test_common/testutil.cpp | 9 +- nano/test_common/testutil.hpp | 2 +- 32 files changed, 817 insertions(+), 1086 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 59b99c58..61f78700 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1105,52 +1105,41 @@ namespace nano { TEST (peer_exclusion, validate) { - nano::peer_exclusion excluded_peers; - size_t fake_peers_count = 10; - auto max_size = excluded_peers.limited_size (fake_peers_count); - for (auto i = 0; i < max_size + 2; ++i) + std::size_t max_size = 10; + + nano::peer_exclusion excluded_peers{ max_size }; + + for (auto i = 0; i < max_size + 1; ++i) { - nano::tcp_endpoint endpoint (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (i)), 0); + nano::tcp_endpoint endpoint{ boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (i)), 0 }; ASSERT_FALSE (excluded_peers.check (endpoint)); - ASSERT_EQ (1, excluded_peers.add (endpoint, fake_peers_count)); + ASSERT_EQ (1, excluded_peers.add (endpoint)); ASSERT_FALSE (excluded_peers.check (endpoint)); } - // The oldest one must have been removed - ASSERT_EQ (max_size + 1, excluded_peers.size ()); - auto & peers_by_endpoint (excluded_peers.peers.get ()); - nano::tcp_endpoint oldest (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x0)), 0); - ASSERT_EQ (peers_by_endpoint.end (), peers_by_endpoint.find (oldest.address ())); + + // The oldest entry must have been removed, because we just overfilled the container + ASSERT_EQ (max_size, excluded_peers.size ()); + nano::tcp_endpoint oldest{ boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x0)), 0 }; + ASSERT_EQ (excluded_peers.score (oldest), 0); auto to_seconds = [] (std::chrono::steady_clock::time_point const & timepoint) { return static_cast (std::chrono::duration_cast (timepoint.time_since_epoch ()).count ()); }; - nano::tcp_endpoint first (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x1)), 0); - ASSERT_NE (peers_by_endpoint.end (), peers_by_endpoint.find (first.address ())); - nano::tcp_endpoint second (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x2)), 0); - ASSERT_EQ (false, excluded_peers.check (second)); - ASSERT_NEAR (to_seconds (std::chrono::steady_clock::now () + excluded_peers.exclude_time_hours), to_seconds (peers_by_endpoint.find (second.address ())->exclude_until), 2); - ASSERT_EQ (2, excluded_peers.add (second, fake_peers_count)); - ASSERT_EQ (peers_by_endpoint.end (), peers_by_endpoint.find (first.address ())); - ASSERT_NEAR (to_seconds (std::chrono::steady_clock::now () + excluded_peers.exclude_time_hours), to_seconds (peers_by_endpoint.find (second.address ())->exclude_until), 2); - ASSERT_EQ (3, excluded_peers.add (second, fake_peers_count)); - ASSERT_NEAR (to_seconds (std::chrono::steady_clock::now () + excluded_peers.exclude_time_hours * 3 * 2), to_seconds (peers_by_endpoint.find (second.address ())->exclude_until), 2); + + // However, the rest of the entries should be present + nano::tcp_endpoint first{ boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x1)), 0 }; + ASSERT_NE (excluded_peers.score (first), 0); + + nano::tcp_endpoint second{ boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (0x2)), 0 }; + ASSERT_NE (excluded_peers.score (second), 0); + + // Check exclusion times + ASSERT_NEAR (to_seconds (std::chrono::steady_clock::now () + excluded_peers.exclude_time_hours), to_seconds (excluded_peers.until (second)), 2); + ASSERT_EQ (2, excluded_peers.add (second)); + ASSERT_NEAR (to_seconds (std::chrono::steady_clock::now () + excluded_peers.exclude_time_hours), to_seconds (excluded_peers.until (second)), 2); + ASSERT_EQ (3, excluded_peers.add (second)); + ASSERT_NEAR (to_seconds (std::chrono::steady_clock::now () + excluded_peers.exclude_time_hours * 3 * 2), to_seconds (excluded_peers.until (second)), 2); ASSERT_EQ (max_size, excluded_peers.size ()); - - // Clear many entries if there are a low number of peers - ASSERT_EQ (4, excluded_peers.add (second, 0)); - ASSERT_EQ (1, excluded_peers.size ()); - - auto component (nano::collect_container_info (excluded_peers, "")); - auto composite (dynamic_cast (component.get ())); - ASSERT_NE (nullptr, component); - auto & children (composite->get_children ()); - ASSERT_EQ (1, children.size ()); - auto child_leaf (dynamic_cast (children.front ().get ())); - ASSERT_NE (nullptr, child_leaf); - auto child_info (child_leaf->get_info ()); - ASSERT_EQ ("peers", child_info.name); - ASSERT_EQ (1, child_info.count); - ASSERT_EQ (sizeof (decltype (excluded_peers.peers)::value_type), child_info.sizeof_element); } } @@ -1165,7 +1154,7 @@ TEST (network, tcp_no_connect_excluded_peers) auto endpoint1_tcp (nano::transport::map_endpoint_to_tcp (node1->network.endpoint ())); while (!node0->network.excluded_peers.check (endpoint1_tcp)) { - node0->network.excluded_peers.add (endpoint1_tcp, 1); + node0->network.excluded_peers.add (endpoint1_tcp); } ASSERT_EQ (0, node0->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_excluded)); node1->network.merge_peer (node0->network.endpoint ()); diff --git a/nano/core_test/telemetry.cpp b/nano/core_test/telemetry.cpp index 3839972d..dc297f16 100644 --- a/nano/core_test/telemetry.cpp +++ b/nano/core_test/telemetry.cpp @@ -274,7 +274,7 @@ TEST (telemetry, no_peers) { nano::test::system system (1); - auto responses = system.nodes[0]->telemetry->get_metrics (); + auto responses = system.nodes[0]->telemetry.get_all_telemetries (); ASSERT_TRUE (responses.empty ()); } @@ -282,73 +282,53 @@ TEST (telemetry, basic) { nano::test::system system; nano::node_flags node_flags; - node_flags.disable_ongoing_telemetry_requests = true; - node_flags.disable_initial_telemetry_requests = true; auto node_client = system.add_node (node_flags); + node_flags.disable_ongoing_telemetry_requests = true; auto node_server = system.add_node (node_flags); nano::test::wait_peer_connections (system); // Request telemetry metrics - nano::telemetry_data telemetry_data; - auto server_endpoint = node_server->network.endpoint (); auto channel = node_client->network.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); - { - std::atomic done{ false }; - node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &server_endpoint, &telemetry_data] (nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - telemetry_data = response_a.telemetry_data; - done = true; - }); - ASSERT_TIMELY (10s, done); - ASSERT_EQ (node_server->get_node_id (), telemetry_data.node_id); - } + std::optional telemetry_data; + ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_endpoint ())); + ASSERT_EQ (node_server->get_node_id (), telemetry_data->node_id); // Check the metrics are correct - nano::test::compare_default_telemetry_response_data (telemetry_data, node_server->network_params, node_server->config.bandwidth_limit, node_server->default_difficulty (nano::work_version::work_1), node_server->node_id); + ASSERT_TRUE (nano::test::compare_telemetry (*telemetry_data, *node_server)); // Call again straight away. It should use the cache - { - std::atomic done{ false }; - node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data] (nano::telemetry_data_response const & response_a) { - ASSERT_EQ (telemetry_data, response_a.telemetry_data); - ASSERT_FALSE (response_a.error); - done = true; - }); - - ASSERT_TIMELY (10s, done); - } + auto telemetry_data_2 = node_client->telemetry.get_telemetry (channel->get_endpoint ()); + ASSERT_TRUE (telemetry_data_2); + ASSERT_EQ (*telemetry_data, *telemetry_data_2); // Wait the cache period and check cache is not used - std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::dev); + WAIT (3s); - std::atomic done{ false }; - node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data] (nano::telemetry_data_response const & response_a) { - ASSERT_NE (telemetry_data, response_a.telemetry_data); - ASSERT_FALSE (response_a.error); - done = true; - }); - - ASSERT_TIMELY (10s, done); + std::optional telemetry_data_3; + ASSERT_TIMELY (5s, telemetry_data_3 = node_client->telemetry.get_telemetry (channel->get_endpoint ())); + ASSERT_NE (*telemetry_data, *telemetry_data_3); } -TEST (telemetry, receive_from_non_listening_channel) +TEST (telemetry, invalid_endpoint) { - nano::test::system system; - auto node = system.add_node (); - nano::telemetry_ack message{ nano::dev::network_params.network, nano::telemetry_data{} }; + nano::test::system system (2); - auto outer_node = nano::test::add_outer_node (system, nano::test::get_available_port ()); - auto channel = nano::test::establish_tcp (system, *outer_node, node->network.endpoint ()); + auto node_client = system.nodes.front (); + auto node_server = system.nodes.back (); - node->network.inbound (message, channel); - // We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it. - ASSERT_EQ (node->telemetry->telemetry_data_size (), 0); + node_client->telemetry.trigger (); + + // Give some time for nodes to exchange telemetry + WAIT (1s); + + nano::endpoint endpoint = *nano::parse_endpoint ("240.0.0.0:12345"); + ASSERT_FALSE (node_client->telemetry.get_telemetry (endpoint)); } -TEST (telemetry, over_tcp) +TEST (telemetry, disconnected) { nano::test::system system; nano::node_flags node_flags; @@ -357,107 +337,17 @@ TEST (telemetry, over_tcp) nano::test::wait_peer_connections (system); - std::atomic done{ false }; auto channel = node_client->network.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); - node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &node_server] (nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - nano::test::compare_default_telemetry_response_data (response_a.telemetry_data, node_server->network_params, node_server->config.bandwidth_limit, node_server->default_difficulty (nano::work_version::work_1), node_server->node_id); - done = true; - }); - ASSERT_TIMELY (10s, done); + // Ensure telemetry is available before disconnecting + ASSERT_TIMELY (5s, node_client->telemetry.get_telemetry (channel->get_endpoint ())); - // Check channels are indeed tcp - ASSERT_EQ (1, node_client->network.size ()); - auto list1 (node_client->network.list (2)); - ASSERT_EQ (node_server->network.endpoint (), list1[0]->get_endpoint ()); - ASSERT_EQ (nano::transport::transport_type::tcp, list1[0]->get_type ()); - ASSERT_EQ (1, node_server->network.size ()); - auto list2 (node_server->network.list (2)); - ASSERT_EQ (node_client->network.endpoint (), list2[0]->get_endpoint ()); - ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ()); -} - -TEST (telemetry, invalid_channel) -{ - nano::test::system system (2); - - auto node_client = system.nodes.front (); - auto node_server = system.nodes.back (); - - std::atomic done{ false }; - node_client->telemetry->get_metrics_single_peer_async (nullptr, [&done] (nano::telemetry_data_response const & response_a) { - ASSERT_TRUE (response_a.error); - done = true; - }); - - ASSERT_TIMELY (10s, done); -} - -TEST (telemetry, blocking_request) -{ - nano::test::system system (2); - - auto node_client = system.nodes.front (); - auto node_server = system.nodes.back (); - - nano::test::wait_peer_connections (system); - - // Request telemetry metrics - std::atomic done{ false }; - std::function call_system_poll; - std::promise promise; - call_system_poll = [&call_system_poll, &workers = node_client->workers, &done, &system, &promise] () { - if (!done) - { - ASSERT_NO_ERROR (system.poll ()); - workers.push_task (call_system_poll); - } - else - { - promise.set_value (); - } - }; - - // Keep pushing system.polls in another thread (thread_pool), because we will be blocking this thread and unable to do so. - system.deadline_set (10s); - node_client->workers.push_task (call_system_poll); - - // Now try single request metric - auto channel = node_client->network.find_node_id (node_server->get_node_id ()); - ASSERT_NE (nullptr, channel); - auto telemetry_data_response = node_client->telemetry->get_metrics_single_peer (channel); - ASSERT_FALSE (telemetry_data_response.error); - nano::test::compare_default_telemetry_response_data (telemetry_data_response.telemetry_data, node_server->network_params, node_server->config.bandwidth_limit, node_server->default_difficulty (nano::work_version::work_1), node_server->node_id); - - done = true; - promise.get_future ().wait (); -} - -TEST (telemetry, disconnects) -{ - nano::test::system system; - nano::node_flags node_flags; - node_flags.disable_initial_telemetry_requests = true; - auto node_client = system.add_node (node_flags); - auto node_server = system.add_node (node_flags); - - nano::test::wait_peer_connections (system); - - // Try and request metrics from a node which is turned off but a channel is not closed yet - auto channel = node_client->network.find_node_id (node_server->get_node_id ()); - ASSERT_NE (nullptr, channel); node_server->stop (); ASSERT_TRUE (channel); - std::atomic done{ false }; - node_client->telemetry->get_metrics_single_peer_async (channel, [&done] (nano::telemetry_data_response const & response_a) { - ASSERT_TRUE (response_a.error); - done = true; - }); - - ASSERT_TIMELY (10s, done); + // Ensure telemetry from disconnected peer is removed + ASSERT_TIMELY (5s, !node_client->telemetry.get_telemetry (channel->get_endpoint ())); } TEST (telemetry, dos_tcp) @@ -465,7 +355,6 @@ TEST (telemetry, dos_tcp) // Confirm that telemetry_reqs are not processed nano::test::system system; nano::node_flags node_flags; - node_flags.disable_initial_telemetry_requests = true; node_flags.disable_ongoing_telemetry_requests = true; auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); @@ -479,7 +368,7 @@ TEST (telemetry, dos_tcp) ASSERT_FALSE (ec); }); - ASSERT_TIMELY (10s, 1 == node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_TIMELY (5s, 1 == node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); auto orig = std::chrono::steady_clock::now (); for (int i = 0; i < 10; ++i) @@ -489,7 +378,7 @@ TEST (telemetry, dos_tcp) }); } - ASSERT_TIMELY (10s, (nano::telemetry_cache_cutoffs::dev + orig) <= std::chrono::steady_clock::now ()); + ASSERT_TIMELY (5s, (nano::dev::network_params.network.telemetry_request_cooldown + orig) <= std::chrono::steady_clock::now ()); // Should process no more telemetry_req messages ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); @@ -506,7 +395,6 @@ TEST (telemetry, disable_metrics) { nano::test::system system; nano::node_flags node_flags; - node_flags.disable_initial_telemetry_requests = true; auto node_client = system.add_node (node_flags); node_flags.disable_providing_telemetry_metrics = true; auto node_server = system.add_node (node_flags); @@ -517,33 +405,26 @@ TEST (telemetry, disable_metrics) auto channel = node_client->network.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); - std::atomic done{ false }; - node_client->telemetry->get_metrics_single_peer_async (channel, [&done] (nano::telemetry_data_response const & response_a) { - ASSERT_TRUE (response_a.error); - done = true; - }); + node_client->telemetry.trigger (); - ASSERT_TIMELY (10s, done); + ASSERT_NEVER (1s, node_client->telemetry.get_telemetry (channel->get_endpoint ())); // It should still be able to receive metrics though - done = false; auto channel1 = node_server->network.find_node_id (node_client->get_node_id ()); ASSERT_NE (nullptr, channel1); - node_server->telemetry->get_metrics_single_peer_async (channel1, [&done, node_client] (nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - nano::test::compare_default_telemetry_response_data (response_a.telemetry_data, node_client->network_params, node_client->config.bandwidth_limit, node_client->default_difficulty (nano::work_version::work_1), node_client->node_id); - done = true; - }); - ASSERT_TIMELY (10s, done); + std::optional telemetry_data; + ASSERT_TIMELY (5s, telemetry_data = node_server->telemetry.get_telemetry (channel1->get_endpoint ())); + + ASSERT_TRUE (nano::test::compare_telemetry (*telemetry_data, *node_client)); } TEST (telemetry, max_possible_size) { nano::test::system system; nano::node_flags node_flags; - node_flags.disable_initial_telemetry_requests = true; node_flags.disable_ongoing_telemetry_requests = true; + node_flags.disable_providing_telemetry_metrics = true; auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); @@ -559,97 +440,94 @@ TEST (telemetry, max_possible_size) ASSERT_FALSE (ec); }); - ASSERT_TIMELY (10s, 1 == node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); -} - -namespace nano -{ -// Test disabled because it's failing intermittently. -// PR in which it got disabled: https://github.com/nanocurrency/nano-node/pull/3512 -// Issue for investigating it: https://github.com/nanocurrency/nano-node/issues/3524 -TEST (telemetry, DISABLED_remove_peer_different_genesis) -{ - nano::test::system system (1); - auto node0 (system.nodes[0]); - ASSERT_EQ (0, node0->network.size ()); - // Change genesis block to something else in this test (this is the reference telemetry processing uses). - nano::network_params network_params{ nano::networks::nano_dev_network }; - network_params.ledger.genesis = network_params.ledger.nano_live_genesis; - nano::node_config config{ network_params }; - auto node1 (std::make_shared (system.io_ctx, nano::unique_path (), config, system.work)); - node1->start (); - system.nodes.push_back (node1); - node0->network.merge_peer (node1->network.endpoint ()); - node1->network.merge_peer (node0->network.endpoint ()); - ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::telemetry, nano::stat::detail::different_genesis_hash) != 0 && node1->stats.count (nano::stat::type::telemetry, nano::stat::detail::different_genesis_hash) != 0); - - ASSERT_TIMELY (1s, 0 == node0->network.size ()); - ASSERT_TIMELY (1s, 0 == node1->network.size ()); - ASSERT_GE (node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out), 1); - ASSERT_GE (node1->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out), 1); - - nano::lock_guard guard (node0->network.excluded_peers.mutex); - ASSERT_EQ (1, node0->network.excluded_peers.peers.get ().count (node1->network.endpoint ().address ())); - ASSERT_EQ (1, node1->network.excluded_peers.peers.get ().count (node0->network.endpoint ().address ())); -} - -TEST (telemetry, remove_peer_invalid_signature) -{ - nano::test::system system; - nano::node_flags node_flags; - node_flags.disable_initial_telemetry_requests = true; - node_flags.disable_ongoing_telemetry_requests = true; - auto node = system.add_node (node_flags); - auto outer_node = nano::test::add_outer_node (system, nano::test::get_available_port ()); - auto channel = nano::test::establish_tcp (system, *outer_node, node->network.endpoint ()); - channel->set_node_id (node->node_id.pub); - // (Implementation detail) So that messages are not just discarded when requests were not sent. - node->telemetry->recent_or_initial_request_telemetry_data.emplace (channel->get_endpoint (), nano::telemetry_data (), std::chrono::steady_clock::now (), true); - - auto telemetry_data = nano::local_telemetry_data (node->ledger, node->network, node->unchecked, node->config.bandwidth_limit, node->network_params, node->startup_time, node->default_difficulty (nano::work_version::work_1), node->node_id); - // Change anything so that the signed message is incorrect - telemetry_data.block_count = 0; - auto telemetry_ack = nano::telemetry_ack{ nano::dev::network_params.network, telemetry_data }; - node->network.inbound (telemetry_ack, channel); - - ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0); - ASSERT_NO_ERROR (system.poll_until_true (3s, [&node, address = channel->get_endpoint ().address ()] () -> bool { - nano::lock_guard guard (node->network.excluded_peers.mutex); - return node->network.excluded_peers.peers.get ().count (address); - })); -} + ASSERT_TIMELY (5s, 1 == node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); } TEST (telemetry, maker_pruning) { nano::test::system system; nano::node_flags node_flags; - node_flags.disable_ongoing_telemetry_requests = true; - node_flags.disable_initial_telemetry_requests = true; auto node_client = system.add_node (node_flags); node_flags.enable_pruning = true; nano::node_config config; config.enable_voting = false; + node_flags.disable_ongoing_telemetry_requests = true; auto node_server = system.add_node (config, node_flags); nano::test::wait_peer_connections (system); // Request telemetry metrics - nano::telemetry_data telemetry_data; - auto server_endpoint = node_server->network.endpoint (); auto channel = node_client->network.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); - { - std::atomic done{ false }; - node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &server_endpoint, &telemetry_data] (nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - telemetry_data = response_a.telemetry_data; - done = true; - }); - ASSERT_TIMELY (10s, done); - ASSERT_EQ (node_server->get_node_id (), telemetry_data.node_id); - } + std::optional telemetry_data; + ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_endpoint ())); + ASSERT_EQ (node_server->get_node_id (), telemetry_data->node_id); - ASSERT_EQ (nano::telemetry_maker::nf_pruned_node, static_cast (telemetry_data.maker)); + // Ensure telemetry response indicates pruned node + ASSERT_EQ (nano::telemetry_maker::nf_pruned_node, static_cast (telemetry_data->maker)); } + +TEST (telemetry, invalid_signature) +{ + nano::test::system system; + auto & node = *system.add_node (); + + auto telemetry = node.local_telemetry (); + telemetry.block_count = 9999; // Change data so signature is no longer valid + + auto message = nano::telemetry_ack{ nano::dev::network_params.network, telemetry }; + node.network.inbound (message, nano::test::fake_channel (node)); + + ASSERT_TIMELY (5s, node.stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0); + ASSERT_ALWAYS (1s, node.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) == 0) +} + +TEST (telemetry, mismatched_node_id) +{ + nano::test::system system; + auto & node = *system.add_node (); + + auto telemetry = node.local_telemetry (); + + auto message = nano::telemetry_ack{ nano::dev::network_params.network, telemetry }; + node.network.inbound (message, nano::test::fake_channel (node, /* node id */ { 123 })); + + ASSERT_TIMELY (5s, node.stats.count (nano::stat::type::telemetry, nano::stat::detail::node_id_mismatch) > 0); + ASSERT_ALWAYS (1s, node.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) == 0) +} + +TEST (telemetry, ongoing_broadcasts) +{ + nano::test::system system; + nano::node_flags node_flags; + node_flags.disable_ongoing_telemetry_requests = true; + auto & node1 = *system.add_node (node_flags); + auto & node2 = *system.add_node (node_flags); + + ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) >= 3); + ASSERT_TIMELY (5s, node2.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) >= 3) +} + +TEST (telemetry, mismatched_genesis) +{ + // Only second node will broadcast telemetry + nano::test::system system; + nano::node_flags node_flags; + node_flags.disable_ongoing_telemetry_requests = true; + node_flags.disable_providing_telemetry_metrics = true; + auto & node1 = *system.add_node (node_flags); + + // Set up a node with different genesis + nano::network_params network_params{ nano::networks::nano_dev_network }; + network_params.ledger.genesis = network_params.ledger.nano_live_genesis; + nano::node_config node_config{ network_params }; + node_flags.disable_providing_telemetry_metrics = false; + auto & node2 = *system.add_node (node_config, node_flags); + + ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::telemetry, nano::stat::detail::genesis_mismatch) > 0); + ASSERT_ALWAYS (1s, node1.stats.count (nano::stat::type::telemetry, nano::stat::detail::process) == 0) + + // Ensure node with different genesis gets disconnected + ASSERT_TIMELY (5s, !node1.network.find_node_id (node2.get_node_id ())); +} \ No newline at end of file diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index 36a71a19..e91b7d53 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -978,8 +978,6 @@ TEST (websocket, telemetry) config.websocket_config.enabled = true; config.websocket_config.port = nano::test::get_available_port (); nano::node_flags node_flags; - node_flags.disable_initial_telemetry_requests = true; - node_flags.disable_ongoing_telemetry_requests = true; auto node1 (system.add_node (config, node_flags)); config.peering_port = nano::test::get_available_port (); config.websocket_config.enabled = true; @@ -1002,9 +1000,9 @@ TEST (websocket, telemetry) ASSERT_TIMELY (10s, done); - node1->telemetry->get_metrics_single_peer_async (node1->network.find_node_id (node2->get_node_id ()), [] (auto const & response_a) { - ASSERT_FALSE (response_a.error); - }); + auto channel = node1->network.find_node_id (node2->get_node_id ()); + ASSERT_NE (channel, nullptr); + ASSERT_TIMELY (5s, node1->telemetry.get_telemetry (channel->get_endpoint ())); ASSERT_TIMELY (10s, future.wait_for (0s) == std::future_status::ready); @@ -1021,7 +1019,8 @@ TEST (websocket, telemetry) nano::jsonconfig telemetry_contents (contents); nano::telemetry_data telemetry_data; telemetry_data.deserialize_json (telemetry_contents, false); - nano::test::compare_default_telemetry_response_data (telemetry_data, node2->network_params, node2->config.bandwidth_limit, node2->default_difficulty (nano::work_version::work_1), node2->node_id); + + ASSERT_TRUE (nano::test::compare_telemetry (telemetry_data, *node2)); ASSERT_EQ (contents.get ("address"), node2->network.endpoint ().address ().to_string ()); ASSERT_EQ (contents.get ("port"), node2->network.endpoint ().port ()); diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index aac93d52..b4f14df4 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -9,6 +9,8 @@ #include #include +using namespace std::chrono_literals; + namespace boost { namespace filesystem @@ -245,6 +247,10 @@ public: max_peers_per_subnetwork = max_peers_per_ip * 4; peer_dump_interval = std::chrono::seconds (1); vote_broadcast_interval = 500; + telemetry_request_cooldown = 500ms; + telemetry_cache_cutoff = 2000ms; + telemetry_request_interval = 500ms; + telemetry_broadcast_interval = 500ms; } } @@ -287,6 +293,15 @@ public: /** Time to wait before vote rebroadcasts for active elections (milliseconds) */ uint64_t vote_broadcast_interval; + /** We do not reply to telemetry requests made within cooldown period */ + std::chrono::milliseconds telemetry_request_cooldown{ 1000 * 15 }; + /** How often to request telemetry from peers */ + std::chrono::milliseconds telemetry_request_interval{ 1000 * 60 }; + /** How often to broadcast telemetry to peers */ + std::chrono::milliseconds telemetry_broadcast_interval{ 1000 * 60 }; + /** Telemetry data older than this value is considered stale */ + std::chrono::milliseconds telemetry_cache_cutoff{ 1000 * 130 }; // 2 * `telemetry_broadcast_interval` + some margin + /** Returns the network this object contains values for */ nano::networks network () const { diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index f58bcaf8..7db0a30f 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -49,6 +49,11 @@ enum class detail : uint8_t // common loop, total, + process, + update, + insert, + request, + broadcast, // processing queue queue, @@ -223,10 +228,14 @@ enum class detail : uint8_t invalid_signature, different_genesis_hash, node_id_mismatch, + genesis_mismatch, request_within_protection_cache_zone, no_response_received, unsolicited_telemetry_ack, failed_send_telemetry_req, + empty_payload, + cleanup_outdated, + cleanup_dead, // vote generator generator_broadcasts, diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index db004041..c851720e 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -105,6 +105,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::bootstrap_server: thread_role_name_string = "Bootstrp serv"; break; + case nano::thread_role::name::telemetry: + thread_role_name_string = "Telemetry"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index 20b6c7c2..72aabd3f 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -47,6 +47,7 @@ namespace thread_role election_hinting, vote_generator_queue, bootstrap_server, + telemetry, }; /* diff --git a/nano/lib/utility.hpp b/nano/lib/utility.hpp index 4547fab5..e758bfc4 100644 --- a/nano/lib/utility.hpp +++ b/nano/lib/utility.hpp @@ -164,6 +164,26 @@ void transform_if (InputIt first, InputIt last, OutputIt dest, Pred pred, Func t } } +/** + * Erase elements from container when predicate returns true + * TODO: Use `std::erase_if` in c++20 + */ +template +void erase_if (Container & container, Pred pred) +{ + for (auto it = container.begin (), end = container.end (); it != end;) + { + if (pred (*it)) + { + it = container.erase (it); + } + else + { + ++it; + } + } +} + /** Safe narrowing cast which silences warnings and asserts on data loss in debug builds. This is optimized away. */ template constexpr TARGET_TYPE narrow_cast (SOURCE_TYPE const & val) diff --git a/nano/node/common.cpp b/nano/node/common.cpp index c242c610..52b7dce8 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -9,10 +9,6 @@ #include -std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::dev; -std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::beta; -std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::live; - uint64_t nano::ip_address_hash_raw (boost::asio::ip::address const & ip_a, uint16_t port) { debug_assert (ip_a.is_v6 ()); @@ -114,6 +110,16 @@ bool nano::parse_endpoint (std::string const & string, nano::endpoint & endpoint return result; } +std::optional nano::parse_endpoint (const std::string & str) +{ + nano::endpoint endpoint; + if (!parse_endpoint (str, endpoint)) + { + return endpoint; // Success + } + return {}; +} + bool nano::parse_tcp_endpoint (std::string const & string, nano::tcp_endpoint & endpoint_a) { boost::asio::ip::address address; @@ -126,12 +132,6 @@ bool nano::parse_tcp_endpoint (std::string const & string, nano::tcp_endpoint & return result; } -std::chrono::seconds nano::telemetry_cache_cutoffs::network_to_time (network_constants const & network_constants) -{ - return std::chrono::seconds{ (network_constants.is_live_network () || network_constants.is_test_network ()) ? live : network_constants.is_beta_network () ? beta - : dev }; -} - nano::node_singleton_memory_pool_purge_guard::node_singleton_memory_pool_purge_guard () : cleanup_guard ({ nano::block_memory_pool_purge, nano::purge_shared_ptr_singleton_pool_memory, nano::purge_shared_ptr_singleton_pool_memory }) { diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 80bfbfe3..96148268 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -8,15 +8,18 @@ #include #include +#include namespace nano { using endpoint = boost::asio::ip::udp::endpoint; +using tcp_endpoint = boost::asio::ip::tcp::endpoint; + bool parse_port (std::string const &, uint16_t &); bool parse_address (std::string const &, boost::asio::ip::address &); bool parse_address_port (std::string const &, boost::asio::ip::address &, uint16_t &); -using tcp_endpoint = boost::asio::ip::tcp::endpoint; bool parse_endpoint (std::string const &, nano::endpoint &); +std::optional parse_endpoint (std::string const &); bool parse_tcp_endpoint (std::string const &, nano::tcp_endpoint &); uint64_t ip_address_hash_raw (boost::asio::ip::address const & ip_a, uint16_t port = 0); } @@ -165,16 +168,6 @@ struct hash namespace nano { -class telemetry_cache_cutoffs -{ -public: - static std::chrono::seconds constexpr dev{ 3 }; - static std::chrono::seconds constexpr beta{ 15 }; - static std::chrono::seconds constexpr live{ 60 }; - - static std::chrono::seconds network_to_time (network_constants const & network_constants); -}; - /** Helper guard which contains all the necessary purge (remove all memory even if used) functions */ class node_singleton_memory_pool_purge_guard { diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 2c859b3d..c9016efd 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3955,15 +3955,13 @@ void nano::json_handler::stop () void nano::json_handler::telemetry () { - auto rpc_l (shared_from_this ()); - auto address_text (request.get_optional ("address")); auto port_text (request.get_optional ("port")); if (address_text.is_initialized () || port_text.is_initialized ()) { // Check both are specified - std::shared_ptr channel; + nano::endpoint endpoint{}; if (address_text.is_initialized () && port_text.is_initialized ()) { uint16_t port; @@ -3972,11 +3970,12 @@ void nano::json_handler::telemetry () boost::asio::ip::address address; if (!nano::parse_address (*address_text, address)) { - nano::endpoint endpoint (address, port); - if (address.is_loopback () && port == rpc_l->node.network.endpoint ().port ()) + endpoint = { address, port }; + + if (address.is_loopback () && port == node.network.endpoint ().port ()) { // Requesting telemetry metrics locally - auto telemetry_data = nano::local_telemetry_data (rpc_l->node.ledger, rpc_l->node.network, rpc_l->node.unchecked, rpc_l->node.config.bandwidth_limit, rpc_l->node.network_params, rpc_l->node.startup_time, rpc_l->node.default_difficulty (nano::work_version::work_1), rpc_l->node.node_id); + auto telemetry_data = node.local_telemetry (); nano::jsonconfig config_l; auto const should_ignore_identification_metrics = false; @@ -3985,20 +3984,12 @@ void nano::json_handler::telemetry () if (!err) { - rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ()); + response_l.insert (response_l.begin (), ptree.begin (), ptree.end ()); } - rpc_l->response_errors (); + response_errors (); return; } - else - { - channel = node.network.find_channel (nano::transport::map_endpoint_to_v6 (endpoint)); - if (!channel) - { - ec = nano::error_rpc::peer_not_found; - } - } } else { @@ -4017,38 +4008,30 @@ void nano::json_handler::telemetry () if (!ec) { - debug_assert (channel); - if (node.telemetry) + auto maybe_telemetry = node.telemetry.get_telemetry (nano::transport::map_endpoint_to_v6 (endpoint)); + if (maybe_telemetry) { - node.telemetry->get_metrics_single_peer_async (channel, [rpc_l] (auto const & telemetry_response_a) { - if (!telemetry_response_a.error) - { - nano::jsonconfig config_l; - auto const should_ignore_identification_metrics = false; - auto err = telemetry_response_a.telemetry_data.serialize_json (config_l, should_ignore_identification_metrics); - auto const & ptree = config_l.get_tree (); + auto telemetry = *maybe_telemetry; + nano::jsonconfig config_l; + auto const should_ignore_identification_metrics = false; + auto err = telemetry.serialize_json (config_l, should_ignore_identification_metrics); + auto const & ptree = config_l.get_tree (); - if (!err) - { - rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ()); - } - else - { - rpc_l->ec = nano::error_rpc::generic; - } - } - else - { - rpc_l->ec = nano::error_rpc::generic; - } - - rpc_l->response_errors (); - }); + if (!err) + { + response_l.insert (response_l.begin (), ptree.begin (), ptree.end ()); + } + else + { + ec = nano::error_rpc::generic; + } } else { - response_errors (); + ec = nano::error_rpc::peer_not_found; } + + response_errors (); } else { @@ -4061,55 +4044,53 @@ void nano::json_handler::telemetry () // setting "raw" to true returns metrics from all nodes requested. auto raw = request.get_optional ("raw"); auto output_raw = raw.value_or (false); - if (node.telemetry) - { - auto telemetry_responses = node.telemetry->get_metrics (); - if (output_raw) - { - boost::property_tree::ptree metrics; - for (auto & telemetry_metrics : telemetry_responses) - { - nano::jsonconfig config_l; - auto const should_ignore_identification_metrics = false; - auto err = telemetry_metrics.second.serialize_json (config_l, should_ignore_identification_metrics); - config_l.put ("address", telemetry_metrics.first.address ()); - config_l.put ("port", telemetry_metrics.first.port ()); - if (!err) - { - metrics.push_back (std::make_pair ("", config_l.get_tree ())); - } - else - { - ec = nano::error_rpc::generic; - } - } - response_l.put_child ("metrics", metrics); - } - else + auto telemetry_responses = node.telemetry.get_all_telemetries (); + if (output_raw) + { + boost::property_tree::ptree metrics; + for (auto & telemetry_metrics : telemetry_responses) { nano::jsonconfig config_l; - std::vector telemetry_datas; - telemetry_datas.reserve (telemetry_responses.size ()); - std::transform (telemetry_responses.begin (), telemetry_responses.end (), std::back_inserter (telemetry_datas), [] (auto const & endpoint_telemetry_data) { - return endpoint_telemetry_data.second; - }); - - auto average_telemetry_metrics = nano::consolidate_telemetry_data (telemetry_datas); - // Don't add node_id/signature in consolidated metrics - auto const should_ignore_identification_metrics = true; - auto err = average_telemetry_metrics.serialize_json (config_l, should_ignore_identification_metrics); - auto const & ptree = config_l.get_tree (); - + auto const should_ignore_identification_metrics = false; + auto err = telemetry_metrics.second.serialize_json (config_l, should_ignore_identification_metrics); + config_l.put ("address", telemetry_metrics.first.address ()); + config_l.put ("port", telemetry_metrics.first.port ()); if (!err) { - response_l.insert (response_l.begin (), ptree.begin (), ptree.end ()); + metrics.push_back (std::make_pair ("", config_l.get_tree ())); } else { ec = nano::error_rpc::generic; } } + + response_l.put_child ("metrics", metrics); + } + else + { + nano::jsonconfig config_l; + std::vector telemetry_datas; + telemetry_datas.reserve (telemetry_responses.size ()); + std::transform (telemetry_responses.begin (), telemetry_responses.end (), std::back_inserter (telemetry_datas), [] (auto const & endpoint_telemetry_data) { + return endpoint_telemetry_data.second; + }); + + auto average_telemetry_metrics = nano::consolidate_telemetry_data (telemetry_datas); + // Don't add node_id/signature in consolidated metrics + auto const should_ignore_identification_metrics = true; + auto err = average_telemetry_metrics.serialize_json (config_l, should_ignore_identification_metrics); + auto const & ptree = config_l.get_tree (); + + if (!err) + { + response_l.insert (response_l.begin (), ptree.begin (), ptree.end ()); + } + else + { + ec = nano::error_rpc::generic; + } } response_errors (); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 99887832..e5b697d3 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -7,6 +7,10 @@ #include +/* + * network + */ + nano::network::network (nano::node & node_a, uint16_t port_a) : id (nano::network_constants::active_network), syn_cookies (node_a.network_params.network.max_peers_per_ip), @@ -523,7 +527,7 @@ public: nano::telemetry_ack telemetry_ack{ node.network_params.network }; if (!node.flags.disable_providing_telemetry_metrics) { - auto telemetry_data = nano::local_telemetry_data (node.ledger, node.network, node.unchecked, node.config.bandwidth_limit, node.network_params, node.startup_time, node.default_difficulty (nano::work_version::work_1), node.node_id); + auto telemetry_data = node.local_telemetry (); telemetry_ack = nano::telemetry_ack{ node.network_params.network, telemetry_data }; } channel->send (telemetry_ack, nullptr, nano::buffer_drop_policy::no_socket_drop); @@ -536,10 +540,7 @@ public: node.logger.try_log (boost::str (boost::format ("Received telemetry_ack message from %1%") % channel->to_string ())); } - if (node.telemetry) - { - node.telemetry->set (message_a, *channel); - } + node.telemetry.process (message_a, channel); } void asc_pull_req (nano::asc_pull_req const & message) override @@ -827,6 +828,19 @@ void nano::network::erase (nano::transport::channel const & channel_a) } } +void nano::network::exclude (std::shared_ptr const & channel) +{ + // Add to peer exclusion list + excluded_peers.add (channel->get_tcp_endpoint ()); + + // Disconnect + erase (*channel); +} + +/* + * message_buffer_manager + */ + nano::message_buffer_manager::message_buffer_manager (nano::stats & stats_a, std::size_t size, std::size_t count) : stats (stats_a), free (count), @@ -1056,7 +1070,7 @@ std::unique_ptr nano::collect_container_info (ne composite->add_component (network.tcp_channels.collect_container_info ("tcp_channels")); composite->add_component (network.udp_channels.collect_container_info ("udp_channels")); composite->add_component (network.syn_cookies.collect_container_info ("syn_cookies")); - composite->add_component (collect_container_info (network.excluded_peers, "excluded_peers")); + composite->add_component (network.excluded_peers.collect_container_info ("excluded_peers")); return composite; } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index fcfa32b2..a5196114 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -172,6 +172,9 @@ public: float size_sqrt () const; bool empty () const; void erase (nano::transport::channel const &); + /** Disconnects and adds peer to exclusion list */ + void exclude (std::shared_ptr const & channel); + static std::string to_string (nano::networks); private: diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 9402b942..bc6354f0 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -30,7 +30,7 @@ extern std::size_t nano_bootstrap_weights_beta_size; } /* - * Configs + * configs */ nano::backlog_population::config nano::backlog_population_config (const nano::node_config & config) @@ -44,21 +44,21 @@ nano::backlog_population::config nano::backlog_population_config (const nano::no nano::vote_cache::config nano::nodeconfig_to_vote_cache_config (node_config const & config, node_flags const & flags) { - vote_cache::config cfg; + vote_cache::config cfg{}; cfg.max_size = flags.inactive_votes_cache_size; return cfg; } nano::hinted_scheduler::config nano::nodeconfig_to_hinted_scheduler_config (const nano::node_config & config) { - hinted_scheduler::config cfg; + hinted_scheduler::config cfg{}; cfg.vote_cache_check_interval_ms = config.network_params.network.is_dev_network () ? 100u : 1000u; return cfg; } nano::outbound_bandwidth_limiter::config nano::outbound_bandwidth_limiter_config (const nano::node_config & config) { - outbound_bandwidth_limiter::config cfg; + outbound_bandwidth_limiter::config cfg{}; cfg.standard_limit = config.bandwidth_limit; cfg.standard_burst_ratio = config.bandwidth_limit_burst_ratio; cfg.bootstrap_limit = config.bootstrap_bandwidth_limit; @@ -170,7 +170,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co // otherwise, any value is considered, with `0` having the special meaning of 'let the OS pick a port instead' // network (*this, config.peering_port.has_value () ? *config.peering_port : 0), - telemetry (std::make_shared (network, workers, observers.telemetry, stats, network_params, flags.disable_ongoing_telemetry_requests)), + telemetry{ nano::telemetry::config{ config, flags }, *this, network, observers, network_params, stats }, bootstrap_initiator (*this), bootstrap_server{ store, ledger, network_params.network, stats }, // BEWARE: `bootstrap` takes `network.port` instead of `config.peering_port` because when the user doesn't specify @@ -220,8 +220,6 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co if (!init_error ()) { - telemetry->start (); - // Notify election schedulers when AEC frees election slot active.vacancy_update = [this] () { scheduler.notify (); @@ -564,10 +562,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator")); composite->add_component (collect_container_info (node.tcp_listener, "tcp_listener")); composite->add_component (collect_container_info (node.network, "network")); - if (node.telemetry) - { - composite->add_component (collect_container_info (*node.telemetry, "telemetry")); - } + composite->add_component (node.telemetry.collect_container_info ("telemetry")); composite->add_component (collect_container_info (node.workers, "workers")); composite->add_component (collect_container_info (node.observers, "observers")); composite->add_component (collect_container_info (node.wallets, "wallets")); @@ -701,6 +696,7 @@ void nano::node::start () hinting.start (); bootstrap_server.start (); websocket.start (); + telemetry.start (); } void nano::node::stop () @@ -728,7 +724,7 @@ void nano::node::stop () final_generator.stop (); confirmation_height_processor.stop (); network.stop (); - telemetry->stop (); + telemetry.stop (); websocket.stop (); bootstrap_server.stop (); bootstrap_initiator.stop (); @@ -1491,6 +1487,35 @@ nano::account nano::node::get_node_id () const return node_id.pub; }; +nano::telemetry_data nano::node::local_telemetry () const +{ + nano::telemetry_data telemetry_data; + telemetry_data.node_id = node_id.pub; + telemetry_data.block_count = ledger.cache.block_count; + telemetry_data.cemented_count = ledger.cache.cemented_count; + telemetry_data.bandwidth_cap = config.bandwidth_limit; + telemetry_data.protocol_version = network_params.network.protocol_version; + telemetry_data.uptime = std::chrono::duration_cast (std::chrono::steady_clock::now () - startup_time).count (); + telemetry_data.unchecked_count = unchecked.count (ledger.store.tx_begin_read ()); + telemetry_data.genesis_block = network_params.ledger.genesis->hash (); + telemetry_data.peer_count = nano::narrow_cast (network.size ()); + telemetry_data.account_count = ledger.cache.account_count; + telemetry_data.major_version = nano::get_major_node_version (); + telemetry_data.minor_version = nano::get_minor_node_version (); + telemetry_data.patch_version = nano::get_patch_node_version (); + telemetry_data.pre_release_version = nano::get_pre_release_node_version (); + telemetry_data.maker = static_cast> (ledger.pruning ? telemetry_maker::nf_pruned_node : telemetry_maker::nf_node); + telemetry_data.timestamp = std::chrono::system_clock::now (); + telemetry_data.active_difficulty = default_difficulty (nano::work_version::work_1); + // Make sure this is the final operation! + telemetry_data.sign (node_id); + return telemetry_data; +} + +/* + * node_wrapper + */ + nano::node_wrapper::node_wrapper (boost::filesystem::path const & path_a, boost::filesystem::path const & config_path_a, nano::node_flags const & node_flags_a) : network_params{ nano::network_constants::active_network }, io_context (std::make_shared ()), @@ -1530,6 +1555,10 @@ nano::node_wrapper::~node_wrapper () node->stop (); } +/* + * inactive_node + */ + nano::inactive_node::inactive_node (boost::filesystem::path const & path_a, boost::filesystem::path const & config_path_a, nano::node_flags const & node_flags_a) : node_wrapper (path_a, config_path_a, node_flags_a), node (node_wrapper.node) diff --git a/nano/node/node.hpp b/nano/node/node.hpp index d91b375d..796ff1a3 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -50,7 +50,6 @@ namespace rocksdb { } // Declare a namespace rocksdb inside nano so all references to the rocksdb library need to be globally scoped e.g. ::rocksdb::Slice class node; -class telemetry; class work_pool; std::unique_ptr collect_container_info (rep_crawler & rep_crawler, std::string const & name); @@ -135,6 +134,9 @@ public: */ void bootstrap_block (nano::block_hash const &); nano::account get_node_id () const; + nano::telemetry_data local_telemetry () const; + +public: nano::write_database_queue write_database_queue; boost::asio::io_context & io_ctx; boost::latch node_initialized_latch; @@ -157,7 +159,7 @@ public: nano::signature_checker checker; nano::outbound_bandwidth_limiter outbound_limiter; nano::network network; - std::shared_ptr telemetry; + nano::telemetry telemetry; nano::bootstrap_initiator bootstrap_initiator; nano::bootstrap_server bootstrap_server; nano::transport::tcp_listener tcp_listener; diff --git a/nano/node/node_observers.hpp b/nano/node/node_observers.hpp index f9b4ea23..d597cb99 100644 --- a/nano/node/node_observers.hpp +++ b/nano/node/node_observers.hpp @@ -21,7 +21,7 @@ public: nano::observer_set> endpoint; nano::observer_set<> disconnect; nano::observer_set work_cancel; - nano::observer_set telemetry; + nano::observer_set const &> telemetry; nano::observer_set socket_connected; nano::observer_set socket_accepted; diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 8cf9d1ca..43a40322 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -36,11 +36,13 @@ class node_config public: node_config (nano::network_params & network_params = nano::dev::network_params); node_config (const std::optional &, nano::logging const &, nano::network_params & network_params = nano::dev::network_params); + nano::error serialize_toml (nano::tomlconfig &) const; nano::error deserialize_toml (nano::tomlconfig &); + bool upgrade_json (unsigned, nano::jsonconfig &); nano::account random_representative () const; - nano::network_params & network_params; + nano::network_params network_params; std::optional peering_port{}; nano::logging logging; std::vector> work_peers; @@ -140,7 +142,6 @@ public: bool disable_unchecked_drop{ true }; bool disable_providing_telemetry_metrics{ false }; bool disable_ongoing_telemetry_requests{ false }; - bool disable_initial_telemetry_requests{ false }; bool disable_block_processor_unchecked_deletion{ false }; bool disable_block_processor_republishing{ false }; bool allow_bootstrap_peers_duplicates{ false }; diff --git a/nano/node/peer_exclusion.cpp b/nano/node/peer_exclusion.cpp index 250bd088..547ff65e 100644 --- a/nano/node/peer_exclusion.cpp +++ b/nano/node/peer_exclusion.cpp @@ -1,28 +1,26 @@ #include -constexpr std::chrono::hours nano::peer_exclusion::exclude_time_hours; -constexpr std::chrono::hours nano::peer_exclusion::exclude_remove_hours; -constexpr std::size_t nano::peer_exclusion::size_max; -constexpr double nano::peer_exclusion::peers_percentage_limit; - -uint64_t nano::peer_exclusion::add (nano::tcp_endpoint const & endpoint_a, std::size_t const network_peers_count_a) +nano::peer_exclusion::peer_exclusion (std::size_t max_size_a) : + max_size{ max_size_a } { - uint64_t result (0); - nano::lock_guard guard (mutex); - // Clean old excluded peers - auto limited = limited_size (network_peers_count_a); - while (peers.size () > 1 && peers.size () > limited) - { - peers.get ().erase (peers.get ().begin ()); - } - debug_assert (peers.size () <= size_max); - auto & peers_by_endpoint (peers.get ()); - auto address = endpoint_a.address (); - auto existing (peers_by_endpoint.find (address)); - if (existing == peers_by_endpoint.end ()) +} + +uint64_t nano::peer_exclusion::add (nano::tcp_endpoint const & endpoint) +{ + uint64_t result = 0; + nano::lock_guard guard{ mutex }; + + if (auto existing = peers.get ().find (endpoint.address ()); existing == peers.get ().end ()) { + // Clean old excluded peers + while (peers.size () > 1 && peers.size () >= max_size) + { + peers.get ().erase (peers.get ().begin ()); + } + debug_assert (peers.size () <= max_size); + // Insert new endpoint - auto inserted (peers.insert (peer_exclusion::item{ std::chrono::steady_clock::steady_clock::now () + exclude_time_hours, address, 1 })); + auto inserted = peers.insert (peer_exclusion::item{ std::chrono::steady_clock::steady_clock::now () + exclude_time_hours, endpoint.address (), 1 }); (void)inserted; debug_assert (inserted.second); result = 1; @@ -30,7 +28,7 @@ uint64_t nano::peer_exclusion::add (nano::tcp_endpoint const & endpoint_a, std:: else { // Update existing endpoint - peers_by_endpoint.modify (existing, [&result] (peer_exclusion::item & item_a) { + peers.get ().modify (existing, [&result] (peer_exclusion::item & item_a) { ++item_a.score; result = item_a.score; if (item_a.score == peer_exclusion::score_limit) @@ -46,50 +44,59 @@ uint64_t nano::peer_exclusion::add (nano::tcp_endpoint const & endpoint_a, std:: return result; } -bool nano::peer_exclusion::check (nano::tcp_endpoint const & endpoint_a) +uint64_t nano::peer_exclusion::score (const nano::tcp_endpoint & endpoint) const { - bool excluded (false); - nano::lock_guard guard (mutex); - auto & peers_by_endpoint (peers.get ()); - auto existing (peers_by_endpoint.find (endpoint_a.address ())); - if (existing != peers_by_endpoint.end () && existing->score >= score_limit) + nano::lock_guard guard{ mutex }; + + if (auto existing = peers.get ().find (endpoint.address ()); existing != peers.get ().end ()) { - if (existing->exclude_until > std::chrono::steady_clock::now ()) + return existing->score; + } + return 0; +} + +std::chrono::steady_clock::time_point nano::peer_exclusion::until (const nano::tcp_endpoint & endpoint) const +{ + nano::lock_guard guard{ mutex }; + + if (auto existing = peers.get ().find (endpoint.address ()); existing != peers.get ().end ()) + { + return existing->exclude_until; + } + return {}; +} + +bool nano::peer_exclusion::check (nano::tcp_endpoint const & endpoint) const +{ + nano::lock_guard guard{ mutex }; + + if (auto existing = peers.get ().find (endpoint.address ()); existing != peers.get ().end ()) + { + if (existing->score >= score_limit && existing->exclude_until > std::chrono::steady_clock::now ()) { - excluded = true; - } - else if (existing->exclude_until + exclude_remove_hours * existing->score < std::chrono::steady_clock::now ()) - { - peers_by_endpoint.erase (existing); + return true; } } - return excluded; + return false; } void nano::peer_exclusion::remove (nano::tcp_endpoint const & endpoint_a) { - nano::lock_guard guard (mutex); + nano::lock_guard guard{ mutex }; peers.get ().erase (endpoint_a.address ()); } -std::size_t nano::peer_exclusion::limited_size (std::size_t const network_peers_count_a) const -{ - return std::min (size_max, static_cast (network_peers_count_a * peers_percentage_limit)); -} - std::size_t nano::peer_exclusion::size () const { - nano::lock_guard guard (mutex); + nano::lock_guard guard{ mutex }; return peers.size (); } -std::unique_ptr nano::collect_container_info (nano::peer_exclusion const & excluded_peers, std::string const & name) +std::unique_ptr nano::peer_exclusion::collect_container_info (std::string const & name) { + nano::lock_guard guard{ mutex }; + auto composite = std::make_unique (name); - - std::size_t excluded_peers_count = excluded_peers.size (); - auto sizeof_excluded_peers_element = sizeof (nano::peer_exclusion::ordered_endpoints::value_type); - composite->add_component (std::make_unique (container_info{ "peers", excluded_peers_count, sizeof_excluded_peers_element })); - + composite->add_component (std::make_unique (container_info{ "peers", peers.size (), sizeof (decltype (peers)::value_type) })); return composite; } diff --git a/nano/node/peer_exclusion.hpp b/nano/node/peer_exclusion.hpp index 303eeb28..d317cb69 100644 --- a/nano/node/peer_exclusion.hpp +++ b/nano/node/peer_exclusion.hpp @@ -19,13 +19,16 @@ class peer_exclusion final uint64_t score; }; +public: + explicit peer_exclusion (std::size_t max_size = 5000); + +private: + std::size_t const max_size; + // clang-format off class tag_endpoint {}; class tag_exclusion {}; - // clang-format on -public: - // clang-format off using ordered_endpoints = boost::multi_index_container, @@ -34,27 +37,22 @@ public: mi::member>>>; // clang-format on -private: ordered_endpoints peers; + mutable nano::mutex mutex; public: - constexpr static std::size_t size_max = 5000; - constexpr static double peers_percentage_limit = 0.5; constexpr static uint64_t score_limit = 2; constexpr static std::chrono::hours exclude_time_hours = std::chrono::hours (1); constexpr static std::chrono::hours exclude_remove_hours = std::chrono::hours (24); - uint64_t add (nano::tcp_endpoint const &, std::size_t const); - bool check (nano::tcp_endpoint const &); + uint64_t add (nano::tcp_endpoint const &); + uint64_t score (nano::tcp_endpoint const &) const; + std::chrono::steady_clock::time_point until (nano::tcp_endpoint const &) const; + bool check (nano::tcp_endpoint const &) const; void remove (nano::tcp_endpoint const &); - std::size_t limited_size (std::size_t const) const; std::size_t size () const; - friend class telemetry_DISABLED_remove_peer_different_genesis_Test; - friend class telemetry_remove_peer_different_genesis_udp_Test; - friend class telemetry_remove_peer_invalid_signature_Test; - friend class peer_exclusion_validate_Test; + std::unique_ptr collect_container_info (std::string const & name); }; -std::unique_ptr collect_container_info (peer_exclusion const & excluded_peers, std::string const & name); } diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index 13953c6c..5260fbfa 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -1,12 +1,11 @@ #include #include #include -#include +#include +#include #include #include -#include #include -#include #include @@ -18,469 +17,283 @@ using namespace std::chrono_literals; -nano::telemetry::telemetry (nano::network & network_a, nano::thread_pool & workers_a, nano::observer_set & observers_a, nano::stats & stats_a, nano::network_params & network_params_a, bool disable_ongoing_requests_a) : - network (network_a), - workers (workers_a), - observers (observers_a), - stats (stats_a), - network_params (network_params_a), - disable_ongoing_requests (disable_ongoing_requests_a) +nano::telemetry::telemetry (const config & config_a, nano::node & node_a, nano::network & network_a, nano::node_observers & observers_a, nano::network_params & network_params_a, nano::stats & stats_a) : + config_m{ config_a }, + node{ node_a }, + network{ network_a }, + observers{ observers_a }, + network_params{ network_params_a }, + stats{ stats_a } { } +nano::telemetry::~telemetry () +{ + // Thread must be stopped before destruction + debug_assert (!thread.joinable ()); +} + void nano::telemetry::start () { - // Cannot be done in the constructor as a shared_from_this () call is made in ongoing_req_all_peers - if (!disable_ongoing_requests) - { - ongoing_req_all_peers (std::chrono::milliseconds (0)); - } + debug_assert (!thread.joinable ()); + + thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::telemetry); + run (); + }); } void nano::telemetry::stop () { - stopped = true; -} - -void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transport::channel const & channel_a) -{ - if (!stopped) { - nano::unique_lock lk (mutex); - nano::endpoint endpoint = channel_a.get_endpoint (); - auto it = recent_or_initial_request_telemetry_data.find (endpoint); - if (it == recent_or_initial_request_telemetry_data.cend () || !it->undergoing_request) - { - // Not requesting telemetry data from this peer so ignore it - stats.inc (nano::stat::type::telemetry, nano::stat::detail::unsolicited_telemetry_ack); - return; - } - - recent_or_initial_request_telemetry_data.modify (it, [&message_a] (nano::telemetry_info & telemetry_info_a) { - telemetry_info_a.data = message_a.data; - }); - - // This can also remove the peer - auto error = verify_message (message_a, channel_a); - - if (!error) - { - // Received telemetry data from a peer which hasn't disabled providing telemetry metrics and there's no errors with the data - lk.unlock (); - observers.notify (message_a.data, endpoint); - lk.lock (); - } - channel_processed (endpoint, error); + nano::lock_guard guard{ mutex }; + stopped = true; } + condition.notify_all (); + nano::join_or_pass (thread); } -bool nano::telemetry::verify_message (nano::telemetry_ack const & message_a, nano::transport::channel const & channel_a) +bool nano::telemetry::verify (const nano::telemetry_ack & telemetry, const std::shared_ptr & channel) const { - if (message_a.is_empty_payload ()) + if (telemetry.is_empty_payload ()) + { + stats.inc (nano::stat::type::telemetry, nano::stat::detail::empty_payload); + return false; + } + + // Check if telemetry node id matches channel node id + if (telemetry.data.node_id != channel->get_node_id ()) + { + stats.inc (nano::stat::type::telemetry, nano::stat::detail::node_id_mismatch); + return false; + } + + // Check whether data is signed by node id presented in telemetry message + if (telemetry.data.validate_signature ()) // Returns false when signature OK + { + stats.inc (nano::stat::type::telemetry, nano::stat::detail::invalid_signature); + return false; + } + + if (telemetry.data.genesis_block != network_params.ledger.genesis->hash ()) + { + network.exclude (channel); + + stats.inc (nano::stat::type::telemetry, nano::stat::detail::genesis_mismatch); + return false; + } + + return true; // Telemetry is OK +} + +void nano::telemetry::process (const nano::telemetry_ack & telemetry, const std::shared_ptr & channel) +{ + if (!verify (telemetry, channel)) + { + return; + } + + nano::unique_lock lock{ mutex }; + + const auto endpoint = channel->get_endpoint (); + + if (auto it = telemetries.get ().find (endpoint); it != telemetries.get ().end ()) + { + stats.inc (nano::stat::type::telemetry, nano::stat::detail::update); + + telemetries.get ().modify (it, [&telemetry, &endpoint] (auto & entry) { + debug_assert (entry.endpoint == endpoint); + entry.data = telemetry.data; + entry.last_updated = std::chrono::steady_clock::now (); + }); + } + else + { + stats.inc (nano::stat::type::telemetry, nano::stat::detail::insert); + telemetries.get ().insert ({ endpoint, telemetry.data, std::chrono::steady_clock::now (), channel }); + + if (telemetries.size () > max_size) + { + stats.inc (nano::stat::type::telemetry, nano::stat::detail::overfill); + telemetries.get ().pop_front (); // Erase oldest entry + } + } + + lock.unlock (); + + observers.telemetry.notify (telemetry.data, channel); + + stats.inc (nano::stat::type::telemetry, nano::stat::detail::process); +} + +void nano::telemetry::trigger () +{ + { + nano::lock_guard guard{ mutex }; + triggered = true; + } + condition.notify_all (); +} + +std::size_t nano::telemetry::size () const +{ + nano::lock_guard guard{ mutex }; + return telemetries.size (); +} + +bool nano::telemetry::request_predicate () const +{ + debug_assert (!mutex.try_lock ()); + + if (triggered) { return true; } - - auto remove_channel = false; - // We want to ensure that the node_id of the channel matches that in the message before attempting to - // use the data to remove any peers. - auto node_id_mismatch = (channel_a.get_node_id () != message_a.data.node_id); - if (!node_id_mismatch) + if (config_m.enable_ongoing_requests) { - // The data could be correctly signed but for a different node id - remove_channel = message_a.data.validate_signature (); - if (!remove_channel) - { - // Check for different genesis blocks - remove_channel = (message_a.data.genesis_block != network_params.ledger.genesis->hash ()); - if (remove_channel) - { - stats.inc (nano::stat::type::telemetry, nano::stat::detail::different_genesis_hash); - } - } - else - { - stats.inc (nano::stat::type::telemetry, nano::stat::detail::invalid_signature); - } + return last_request + network_params.network.telemetry_request_interval < std::chrono::steady_clock::now (); } - else + return false; +} + +bool nano::telemetry::broadcast_predicate () const +{ + debug_assert (!mutex.try_lock ()); + + if (config_m.enable_ongoing_broadcasts) { - stats.inc (nano::stat::type::telemetry, nano::stat::detail::node_id_mismatch); + return last_broadcast + network_params.network.telemetry_broadcast_interval < std::chrono::steady_clock::now (); } + return false; +} - if (remove_channel) +void nano::telemetry::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) { - // Add to peer exclusion list - network.excluded_peers.add (channel_a.get_tcp_endpoint (), network.size ()); + stats.inc (nano::stat::type::telemetry, nano::stat::detail::loop); - // Disconnect from peer with incorrect telemetry data - network.erase (channel_a); - } + cleanup (); - return remove_channel || node_id_mismatch; -} - -std::chrono::milliseconds nano::telemetry::cache_plus_buffer_cutoff_time () const -{ - // This include the waiting time for the response as well as a buffer (1 second) waiting for the alarm operation to be scheduled and completed - return cache_cutoff + response_time_cutoff + 1s; -} - -bool nano::telemetry::within_cache_plus_buffer_cutoff (telemetry_info const & telemetry_info) const -{ - auto is_within = (telemetry_info.last_response + cache_plus_buffer_cutoff_time ()) >= std::chrono::steady_clock::now (); - return !telemetry_info.awaiting_first_response () && is_within; -} - -bool nano::telemetry::within_cache_cutoff (telemetry_info const & telemetry_info) const -{ - auto is_within = (telemetry_info.last_response + cache_cutoff) >= std::chrono::steady_clock::now (); - return !telemetry_info.awaiting_first_response () && is_within; -} - -void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_request_interval) -{ - workers.add_timed_task (std::chrono::steady_clock::now () + next_request_interval, [this_w = std::weak_ptr (shared_from_this ())] () { - if (auto this_l = this_w.lock ()) + if (request_predicate ()) { - // Check if there are any peers which are in the peers list which haven't been request, or any which are below or equal to the cache cutoff time - if (!this_l->stopped) - { - class tag_channel - { - }; + triggered = false; + lock.unlock (); - struct channel_wrapper - { - std::shared_ptr channel; - channel_wrapper (std::shared_ptr const & channel_a) : - channel (channel_a) - { - } - nano::endpoint endpoint () const - { - return channel->get_endpoint (); - } - }; + run_requests (); - // clang-format off - namespace mi = boost::multi_index; - boost::multi_index_container, - mi::const_mem_fun>, - mi::hashed_unique, - mi::member, &channel_wrapper::channel>>>> peers; - // clang-format on - - { - // Copy peers to the multi index container so can get better asymptotic complexity in future operations - auto temp_peers = this_l->network.list (std::numeric_limits::max ()); - peers.insert (temp_peers.begin (), temp_peers.end ()); - } - - { - // Cleanup any stale saved telemetry data for non-existent peers - nano::lock_guard guard (this_l->mutex); - for (auto it = this_l->recent_or_initial_request_telemetry_data.begin (); it != this_l->recent_or_initial_request_telemetry_data.end ();) - { - if (!it->undergoing_request && !this_l->within_cache_cutoff (*it) && peers.count (it->endpoint) == 0) - { - it = this_l->recent_or_initial_request_telemetry_data.erase (it); - } - else - { - ++it; - } - } - - // Remove from peers list if it exists and is within the cache cutoff - for (auto peers_it = peers.begin (); peers_it != peers.end ();) - { - auto it = this_l->recent_or_initial_request_telemetry_data.find (peers_it->endpoint ()); - if (it != this_l->recent_or_initial_request_telemetry_data.cend () && this_l->within_cache_cutoff (*it)) - { - peers_it = peers.erase (peers_it); - } - else - { - ++peers_it; - } - } - } - - // Request data from new peers, or ones which are out of date - for (auto const & peer : boost::make_iterator_range (peers)) - { - this_l->get_metrics_single_peer_async (peer.channel, [] (auto const &) { - // Intentionally empty, just using to refresh the cache - }); - } - - // Schedule the next request; Use the default request time unless a telemetry request cache expires sooner - nano::lock_guard guard (this_l->mutex); - long long next_round = std::chrono::duration_cast (this_l->cache_cutoff + this_l->response_time_cutoff).count (); - if (!this_l->recent_or_initial_request_telemetry_data.empty ()) - { - auto range = boost::make_iterator_range (this_l->recent_or_initial_request_telemetry_data.get ()); - for (auto telemetry_info : range) - { - if (!telemetry_info.undergoing_request && peers.count (telemetry_info.endpoint) == 0) - { - auto const last_response = telemetry_info.last_response; - auto now = std::chrono::steady_clock::now (); - if (now > last_response + this_l->cache_cutoff) - { - next_round = std::min (next_round, std::chrono::duration_cast (now - (last_response + this_l->cache_cutoff)).count ()); - } - // We are iterating in sorted order from last_updated, so can break once we have found the first valid one. - break; - } - } - } - - this_l->ongoing_req_all_peers (std::chrono::milliseconds (next_round)); - } + lock.lock (); + last_request = std::chrono::steady_clock::now (); } - }); -} -std::unordered_map nano::telemetry::get_metrics () -{ - std::unordered_map telemetry_data; - - nano::lock_guard guard (mutex); - auto range = boost::make_iterator_range (recent_or_initial_request_telemetry_data); - // clang-format off - nano::transform_if (range.begin (), range.end (), std::inserter (telemetry_data, telemetry_data.end ()), - [this](auto const & telemetry_info) { return this->within_cache_plus_buffer_cutoff (telemetry_info); }, - [](auto const & telemetry_info) { return std::pair{ telemetry_info.endpoint, telemetry_info.data }; }); - // clang-format on - - return telemetry_data; -} - -void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr const & channel_a, std::function const & callback_a) -{ - auto invoke_callback_with_error = [&callback_a, &workers = this->workers, channel_a] () { - nano::endpoint endpoint; - if (channel_a) + if (broadcast_predicate ()) { - endpoint = channel_a->get_endpoint (); - } - workers.push_task ([callback_a, endpoint] () { - auto const error = true; - callback_a ({ nano::telemetry_data{}, endpoint, error }); - }); - }; + lock.unlock (); - if (!stopped) - { - if (channel_a) - { - auto add_callback_async = [&workers = this->workers, &callback_a] (telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a) { - telemetry_data_response telemetry_data_response_l{ telemetry_data_a, endpoint_a, false }; - workers.push_task ([telemetry_data_response_l, callback_a] () { - callback_a (telemetry_data_response_l); - }); - }; + run_broadcasts (); - // Check if this is within the cache - nano::lock_guard guard (mutex); - auto it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ()); - if (it != recent_or_initial_request_telemetry_data.cend () && within_cache_cutoff (*it)) - { - add_callback_async (it->data, it->endpoint); - } - else - { - if (it != recent_or_initial_request_telemetry_data.cend () && it->undergoing_request) - { - // A request is currently undergoing, add the callback - debug_assert (callbacks.count (it->endpoint) > 0); - callbacks[it->endpoint].push_back (callback_a); - } - else - { - if (it == recent_or_initial_request_telemetry_data.cend ()) - { - // Insert dummy values, it's important not to use "last_response" time here without first checking that awaiting_first_response () returns false. - recent_or_initial_request_telemetry_data.emplace (channel_a->get_endpoint (), nano::telemetry_data (), std::chrono::steady_clock::now (), true); - it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ()); - } - else - { - recent_or_initial_request_telemetry_data.modify (it, [] (nano::telemetry_info & telemetry_info_a) { - telemetry_info_a.undergoing_request = true; - }); - } - callbacks[it->endpoint].push_back (callback_a); - fire_request_message (channel_a); - } - } + lock.lock (); + last_broadcast = std::chrono::steady_clock::now (); } - else - { - invoke_callback_with_error (); - } - } - else - { - invoke_callback_with_error (); + + condition.wait_for (lock, std::min (network_params.network.telemetry_request_interval, network_params.network.telemetry_broadcast_interval) / 2); } } -nano::telemetry_data_response nano::telemetry::get_metrics_single_peer (std::shared_ptr const & channel_a) +void nano::telemetry::run_requests () { - std::promise promise; - get_metrics_single_peer_async (channel_a, [&promise] (telemetry_data_response const & single_metric_data_a) { - promise.set_value (single_metric_data_a); - }); + auto peers = network.list (); - return promise.get_future ().get (); + for (auto & channel : peers) + { + request (channel); + } } -void nano::telemetry::fire_request_message (std::shared_ptr const & channel_a) +void nano::telemetry::request (std::shared_ptr & channel) { - uint64_t round_l; - { - auto it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ()); - recent_or_initial_request_telemetry_data.modify (it, [] (nano::telemetry_info & telemetry_info_a) { - ++telemetry_info_a.round; - }); - round_l = it->round; - } + stats.inc (nano::stat::type::telemetry, nano::stat::detail::request); - std::weak_ptr this_w (shared_from_this ()); nano::telemetry_req message{ network_params.network }; - // clang-format off - channel_a->send (message, [this_w, endpoint = channel_a->get_endpoint (), round_l](boost::system::error_code const & ec, std::size_t size_a) { - if (auto this_l = this_w.lock ()) - { - if (ec) - { - // Error sending the telemetry_req message - this_l->stats.inc (nano::stat::type::telemetry, nano::stat::detail::failed_send_telemetry_req); - nano::lock_guard guard (this_l->mutex); - this_l->channel_processed (endpoint, true); - } - else - { - // If no response is seen after a certain period of time remove it - this_l->workers.add_timed_task (std::chrono::steady_clock::now () + this_l->response_time_cutoff, [round_l, this_w, endpoint]() { - if (auto this_l = this_w.lock ()) - { - nano::lock_guard guard (this_l->mutex); - auto it = this_l->recent_or_initial_request_telemetry_data.find (endpoint); - if (it != this_l->recent_or_initial_request_telemetry_data.cend () && it->undergoing_request && round_l == it->round) - { - this_l->stats.inc (nano::stat::type::telemetry, nano::stat::detail::no_response_received); - this_l->channel_processed (endpoint, true); - } - } - }); - } - } - }, - nano::buffer_drop_policy::no_socket_drop); - // clang-format on + channel->send (message); } -void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool error_a) +void nano::telemetry::run_broadcasts () { - auto it = recent_or_initial_request_telemetry_data.find (endpoint_a); - if (it != recent_or_initial_request_telemetry_data.end ()) + auto telemetry = node.local_telemetry (); + auto peers = network.list (); + + for (auto & channel : peers) { - if (!error_a) - { - recent_or_initial_request_telemetry_data.modify (it, [] (nano::telemetry_info & telemetry_info_a) { - telemetry_info_a.last_response = std::chrono::steady_clock::now (); - telemetry_info_a.undergoing_request = false; - }); - } - else - { - recent_or_initial_request_telemetry_data.erase (endpoint_a); - } - flush_callbacks_async (endpoint_a, error_a); + broadcast (channel, telemetry); } } -void nano::telemetry::flush_callbacks_async (nano::endpoint const & endpoint_a, bool error_a) +void nano::telemetry::broadcast (std::shared_ptr & channel, const nano::telemetry_data & telemetry) { - // Post to thread_pool so that it's truly async and not on the calling thread (same problem as std::async otherwise) - workers.push_task ([endpoint_a, error_a, this_w = std::weak_ptr (shared_from_this ())] () { - if (auto this_l = this_w.lock ()) + stats.inc (nano::stat::type::telemetry, nano::stat::detail::broadcast); + + nano::telemetry_ack message{ network_params.network, telemetry }; + channel->send (message); +} + +void nano::telemetry::cleanup () +{ + debug_assert (!mutex.try_lock ()); + + nano::erase_if (telemetries, [this] (entry const & entry) { + // Remove if telemetry data is stale + if (!check_timeout (entry)) { - nano::unique_lock lk (this_l->mutex); - while (!this_l->callbacks[endpoint_a].empty ()) - { - lk.unlock (); - this_l->invoke_callbacks (endpoint_a, error_a); - lk.lock (); - } + stats.inc (nano::stat::type::telemetry, nano::stat::detail::cleanup_outdated); + return true; // Erase } + + return false; // Do not erase }); } -void nano::telemetry::invoke_callbacks (nano::endpoint const & endpoint_a, bool error_a) +bool nano::telemetry::check_timeout (const entry & entry) const { - std::vector> callbacks_l; - telemetry_data_response response_data{ nano::telemetry_data (), endpoint_a, error_a }; - { - // Copy data so that it can be used outside of holding the lock - nano::lock_guard guard (mutex); + return entry.last_updated + network_params.network.telemetry_cache_cutoff >= std::chrono::steady_clock::now (); +} - callbacks_l = callbacks[endpoint_a]; - auto it = recent_or_initial_request_telemetry_data.find (endpoint_a); - if (it != recent_or_initial_request_telemetry_data.end ()) +std::optional nano::telemetry::get_telemetry (const nano::endpoint & endpoint) const +{ + nano::lock_guard guard{ mutex }; + + if (auto it = telemetries.get ().find (endpoint); it != telemetries.get ().end ()) + { + if (check_timeout (*it)) { - response_data.telemetry_data = it->data; + return it->data; } - callbacks.erase (endpoint_a); } + return {}; +} - // Need to account for nodes which disable telemetry data in responses - for (auto & callback : callbacks_l) +std::unordered_map nano::telemetry::get_all_telemetries () const +{ + nano::lock_guard guard{ mutex }; + + std::unordered_map result; + for (auto const & entry : telemetries) { - callback (response_data); + if (check_timeout (entry)) + { + result[entry.endpoint] = entry.data; + } } + return result; } -std::size_t nano::telemetry::telemetry_data_size () +std::unique_ptr nano::telemetry::collect_container_info (const std::string & name) { - nano::lock_guard guard (mutex); - return recent_or_initial_request_telemetry_data.size (); -} + nano::lock_guard guard{ mutex }; -nano::telemetry_info::telemetry_info (nano::endpoint const & endpoint_a, nano::telemetry_data const & data_a, std::chrono::steady_clock::time_point last_response_a, bool undergoing_request_a) : - endpoint (endpoint_a), - data (data_a), - last_response (last_response_a), - undergoing_request (undergoing_request_a) -{ -} - -bool nano::telemetry_info::awaiting_first_response () const -{ - return data == nano::telemetry_data (); -} - -std::unique_ptr nano::collect_container_info (telemetry & telemetry, std::string const & name) -{ auto composite = std::make_unique (name); - std::size_t callbacks_count; - { - nano::lock_guard guard (telemetry.mutex); - std::unordered_map>> callbacks; - callbacks_count = std::accumulate (callbacks.begin (), callbacks.end (), static_cast (0), [] (auto total, auto const & callback_a) { - return total += callback_a.second.size (); - }); - } - - composite->add_component (std::make_unique (container_info{ "recent_or_initial_request_telemetry_data", telemetry.telemetry_data_size (), sizeof (decltype (telemetry.recent_or_initial_request_telemetry_data)::value_type) })); - composite->add_component (std::make_unique (container_info{ "callbacks", callbacks_count, sizeof (decltype (telemetry.callbacks)::value_type::second_type) })); - + composite->add_component (std::make_unique (container_info{ "telemetries", telemetries.size (), sizeof (decltype (telemetries)::value_type) })); return composite; } @@ -626,29 +439,4 @@ nano::telemetry_data nano::consolidate_telemetry_data (std::vector (version_fragments[4]); return consolidated_data; -} - -nano::telemetry_data nano::local_telemetry_data (nano::ledger const & ledger_a, nano::network & network_a, nano::unchecked_map const & unchecked, uint64_t bandwidth_limit_a, nano::network_params const & network_params_a, std::chrono::steady_clock::time_point statup_time_a, uint64_t active_difficulty_a, nano::keypair const & node_id_a) -{ - nano::telemetry_data telemetry_data; - telemetry_data.node_id = node_id_a.pub; - telemetry_data.block_count = ledger_a.cache.block_count; - telemetry_data.cemented_count = ledger_a.cache.cemented_count; - telemetry_data.bandwidth_cap = bandwidth_limit_a; - telemetry_data.protocol_version = network_params_a.network.protocol_version; - telemetry_data.uptime = std::chrono::duration_cast (std::chrono::steady_clock::now () - statup_time_a).count (); - telemetry_data.unchecked_count = unchecked.count (ledger_a.store.tx_begin_read ()); - telemetry_data.genesis_block = network_params_a.ledger.genesis->hash (); - telemetry_data.peer_count = nano::narrow_cast (network_a.size ()); - telemetry_data.account_count = ledger_a.cache.account_count; - telemetry_data.major_version = nano::get_major_node_version (); - telemetry_data.minor_version = nano::get_minor_node_version (); - telemetry_data.patch_version = nano::get_patch_node_version (); - telemetry_data.pre_release_version = nano::get_pre_release_node_version (); - telemetry_data.maker = static_cast> (ledger_a.pruning ? telemetry_maker::nf_pruned_node : telemetry_maker::nf_node); - telemetry_data.timestamp = std::chrono::system_clock::now (); - telemetry_data.active_difficulty = active_difficulty_a; - // Make sure this is the final operation! - telemetry_data.sign (node_id_a); - return telemetry_data; -} +} \ No newline at end of file diff --git a/nano/node/telemetry.hpp b/nano/node/telemetry.hpp index 325e446c..57332f75 100644 --- a/nano/node/telemetry.hpp +++ b/nano/node/telemetry.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -12,12 +13,16 @@ #include #include +#include +#include namespace mi = boost::multi_index; namespace nano { +class node; class network; +class node_observers; class stats; class ledger; class thread_pool; @@ -27,129 +32,122 @@ namespace transport class channel; } -/* - * Holds a response from a telemetry request +/** + * This class periodically broadcasts and requests telemetry from peers. + * Those intervals are configurable via `telemetry_request_interval` & `telemetry_broadcast_interval` network constants + * Telemetry datas are only removed after becoming stale (configurable via `telemetry_cache_cutoff` network constant), so peer data will still be available for a short period after that peer is disconnected + * + * Requests can be disabled via `disable_ongoing_telemetry_requests` node flag + * Broadcasts can be disabled via `disable_providing_telemetry_metrics` node flag + * */ -class telemetry_data_response +class telemetry { public: - nano::telemetry_data telemetry_data; - nano::endpoint endpoint; - bool error{ true }; -}; + struct config + { + bool enable_ongoing_requests{ true }; + bool enable_ongoing_broadcasts{ true }; + + config (nano::node_config const & config, nano::node_flags const & flags) : + enable_ongoing_requests{ !flags.disable_ongoing_telemetry_requests }, + enable_ongoing_broadcasts{ !flags.disable_providing_telemetry_metrics } + { + } + }; -class telemetry_info final -{ public: - telemetry_info () = default; - telemetry_info (nano::endpoint const & endpoint, nano::telemetry_data const & data, std::chrono::steady_clock::time_point last_response, bool undergoing_request); - bool awaiting_first_response () const; + telemetry (config const &, nano::node &, nano::network &, nano::node_observers &, nano::network_params &, nano::stats &); + ~telemetry (); - nano::endpoint endpoint; - nano::telemetry_data data; - std::chrono::steady_clock::time_point last_response; - bool undergoing_request{ false }; - uint64_t round{ 0 }; -}; - -/* - * This class requests node telemetry metrics from peers and invokes any callbacks which have been aggregated. - * All calls to get_metrics return cached data, it does not do any requests, these are periodically done in - * ongoing_req_all_peers. This can be disabled with the disable_ongoing_telemetry_requests node flag. - * Calls to get_metrics_single_peer_async will wait until a response is made if it is not within the cache - * cut off. - */ -class telemetry : public std::enable_shared_from_this -{ -public: - telemetry (nano::network &, nano::thread_pool &, nano::observer_set &, nano::stats &, nano::network_params &, bool); void start (); void stop (); - /* - * Received telemetry metrics from this peer + /** + * Process telemetry message from network */ - void set (nano::telemetry_ack const &, nano::transport::channel const &); + void process (nano::telemetry_ack const &, std::shared_ptr const &); - /* - * This returns what ever is in the cache + /** + * Trigger manual telemetry request to all peers */ - std::unordered_map get_metrics (); + void trigger (); - /* - * This makes a telemetry request to the specific channel. - * Error is set for: no response received, no payload received, invalid signature or unsound metrics in message (e.g different genesis block) - */ - void get_metrics_single_peer_async (std::shared_ptr const &, std::function const &); + std::size_t size () const; - /* - * A blocking version of get_metrics_single_peer_async + /** + * Returns telemetry for selected endpoint */ - telemetry_data_response get_metrics_single_peer (std::shared_ptr const &); + std::optional get_telemetry (nano::endpoint const &) const; - /* - * Return the number of node metrics collected + /** + * Returns all available telemetry */ - std::size_t telemetry_data_size (); + std::unordered_map get_all_telemetries () const; - /* - * Returns the time for the cache, response and a small buffer for alarm operations to be scheduled and completed - */ - std::chrono::milliseconds cache_plus_buffer_cutoff_time () const; +public: // Container info + std::unique_ptr collect_container_info (std::string const & name); + +private: // Dependencies + nano::node & node; + nano::network & network; + nano::node_observers & observers; + nano::network_params & network_params; + nano::stats & stats; + + const config config_m; private: - class tag_endpoint - { - }; - class tag_last_updated + struct entry { + nano::endpoint endpoint; + nano::telemetry_data data; + std::chrono::steady_clock::time_point last_updated; + std::shared_ptr channel; }; - nano::network & network; - nano::thread_pool & workers; - nano::observer_set & observers; - nano::stats & stats; - /* Important that this is a reference to the node network_params for tests which want to modify genesis block */ - nano::network_params & network_params; - bool disable_ongoing_requests; +private: + bool request_predicate () const; + bool broadcast_predicate () const; - std::atomic stopped{ false }; + void run (); + void run_requests (); + void run_broadcasts (); + void cleanup (); - nano::mutex mutex{ mutex_identifier (mutexes::telemetry) }; + void request (std::shared_ptr &); + void broadcast (std::shared_ptr &, nano::telemetry_data const &); + + bool verify (nano::telemetry_ack const &, std::shared_ptr const &) const; + bool check_timeout (entry const &) const; + +private: // clang-format off - // This holds the last telemetry data received from peers or can be a placeholder awaiting the first response (check with awaiting_first_response ()) - boost::multi_index_container>, mi::hashed_unique, - mi::member>, - mi::ordered_non_unique, - mi::member>>> recent_or_initial_request_telemetry_data; + mi::member> + >>; // clang-format on - // Anything older than this requires requesting metrics from other nodes. - std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) }; + ordered_telemetries telemetries; - // The maximum time spent waiting for a response to a telemetry request - std::chrono::seconds const response_time_cutoff{ network_params.network.is_dev_network () ? (is_sanitizer_build () || nano::running_within_valgrind () ? 6 : 3) : 10 }; + bool triggered{ false }; + std::chrono::steady_clock::time_point last_request{}; + std::chrono::steady_clock::time_point last_broadcast{}; - std::unordered_map>> callbacks; + bool stopped{ false }; + mutable nano::mutex mutex{ mutex_identifier (mutexes::telemetry) }; + nano::condition_variable condition; + std::thread thread; - void ongoing_req_all_peers (std::chrono::milliseconds); - - void fire_request_message (std::shared_ptr const &); - void channel_processed (nano::endpoint const &, bool); - void flush_callbacks_async (nano::endpoint const &, bool); - void invoke_callbacks (nano::endpoint const &, bool); - - bool within_cache_cutoff (nano::telemetry_info const &) const; - bool within_cache_plus_buffer_cutoff (telemetry_info const &) const; - bool verify_message (nano::telemetry_ack const &, nano::transport::channel const &); - friend std::unique_ptr collect_container_info (telemetry &, std::string const &); - friend class telemetry_remove_peer_invalid_signature_Test; +private: + static std::size_t constexpr max_size = 1024; }; -std::unique_ptr collect_container_info (telemetry & telemetry, std::string const & name); - nano::telemetry_data consolidate_telemetry_data (std::vector const & telemetry_data); -nano::telemetry_data local_telemetry_data (nano::ledger const & ledger_a, nano::network &, nano::unchecked_map const &, uint64_t, nano::network_params const &, std::chrono::steady_clock::time_point, uint64_t, nano::keypair const &); } diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index dc2f2cf7..3c48d3ad 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -738,13 +738,6 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptrsocket->type_set (nano::socket::type_t::realtime_response_server); response_server->remote_node_id = channel_a->get_node_id (); response_server->start (); - - if (!node_l->flags.disable_initial_telemetry_requests) - { - node_l->telemetry->get_metrics_single_peer_async (channel_a, [] (nano::telemetry_data_response const &) { - // Intentionally empty, starts the telemetry request cycle to more quickly disconnect from invalid peers - }); - } }); }); } diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index 854e8ddd..b5be8c74 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -444,9 +444,8 @@ void nano::transport::tcp_server::realtime_message_visitor::frontier_req (const void nano::transport::tcp_server::realtime_message_visitor::telemetry_req (const nano::telemetry_req & message) { - // Only handle telemetry requests if they are outside of the cutoff time - bool cache_exceeded = std::chrono::steady_clock::now () >= server.last_telemetry_req + nano::telemetry_cache_cutoffs::network_to_time (server.node->network_params.network); - if (cache_exceeded) + // Only handle telemetry requests if they are outside the cooldown period + if (server.last_telemetry_req + server.node->network_params.network.telemetry_request_cooldown < std::chrono::steady_clock::now ()) { server.last_telemetry_req = std::chrono::steady_clock::now (); process = true; diff --git a/nano/node/transport/tcp_server.hpp b/nano/node/transport/tcp_server.hpp index 6504bbc5..834abe86 100644 --- a/nano/node/transport/tcp_server.hpp +++ b/nano/node/transport/tcp_server.hpp @@ -62,7 +62,7 @@ public: // Remote enpoint used to remove response channel even after socket closing nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 }; nano::account remote_node_id{}; - std::chrono::steady_clock::time_point last_telemetry_req{ std::chrono::steady_clock::time_point () }; + std::chrono::steady_clock::time_point last_telemetry_req{}; private: void receive_message (); diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 49849f02..2c1579e8 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -435,7 +435,7 @@ public: if (find_channel) { auto is_very_first_message = find_channel->get_last_telemetry_req () == std::chrono::steady_clock::time_point{}; - auto cache_exceeded = std::chrono::steady_clock::now () >= find_channel->get_last_telemetry_req () + nano::telemetry_cache_cutoffs::network_to_time (node.network_params.network); + auto cache_exceeded = std::chrono::steady_clock::now () >= find_channel->get_last_telemetry_req () + node.network_params.network.telemetry_request_cooldown; if (is_very_first_message || cache_exceeded) { node.network.udp_channels.modify (find_channel, [] (std::shared_ptr const & channel_a) { diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 89ec692d..03e3301c 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -1031,11 +1031,11 @@ nano::websocket_server::websocket_server (nano::websocket::config & config_a, na } }); - observers.telemetry.add ([this] (nano::telemetry_data const & telemetry_data, nano::endpoint const & endpoint) { + observers.telemetry.add ([this] (nano::telemetry_data const & telemetry_data, std::shared_ptr const & channel) { if (server->any_subscriber (nano::websocket::topic::telemetry)) { nano::websocket::message_builder builder; - server->broadcast (builder.telemetry_received (telemetry_data, endpoint)); + server->broadcast (builder.telemetry_received (telemetry_data, channel->get_endpoint ())); } }); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 590d5051..0bcb9af1 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -7398,7 +7398,7 @@ TEST (rpc, telemetry_single) nano::telemetry_data telemetry_data; auto const should_ignore_identification_metrics = false; ASSERT_FALSE (telemetry_data.deserialize_json (config, should_ignore_identification_metrics)); - nano::test::compare_default_telemetry_response_data (telemetry_data, node->network_params, node->config.bandwidth_limit, node->default_difficulty (nano::work_version::work_1), node->node_id); + ASSERT_TRUE (nano::test::compare_telemetry (telemetry_data, *node)); } } @@ -7412,14 +7412,11 @@ TEST (rpc, telemetry_all) ASSERT_TIMELY (10s, node1->store.peer.count (node1->store.tx_begin_read ()) != 0); // First need to set up the cached data - std::atomic done{ false }; auto node = system.nodes.front (); - node1->telemetry->get_metrics_single_peer_async (node1->network.find_node_id (node->get_node_id ()), [&done] (nano::telemetry_data_response const & telemetry_data_response_a) { - ASSERT_FALSE (telemetry_data_response_a.error); - done = true; - }); - ASSERT_TIMELY (10s, done); + auto channel = node1->network.find_node_id (node->get_node_id ()); + ASSERT_TRUE (channel); + ASSERT_TIMELY (10s, node1->telemetry.get_telemetry (channel->get_endpoint ())); boost::property_tree::ptree request; request.put ("action", "telemetry"); @@ -7429,7 +7426,7 @@ TEST (rpc, telemetry_all) nano::telemetry_data telemetry_data; auto const should_ignore_identification_metrics = true; ASSERT_FALSE (telemetry_data.deserialize_json (config, should_ignore_identification_metrics)); - nano::test::compare_default_telemetry_response_data_excluding_signature (telemetry_data, node->network_params, node->config.bandwidth_limit, node->default_difficulty (nano::work_version::work_1)); + ASSERT_TRUE (nano::test::compare_telemetry_data (telemetry_data, node->local_telemetry ())); ASSERT_FALSE (response.get_optional ("node_id").is_initialized ()); ASSERT_FALSE (response.get_optional ("signature").is_initialized ()); } @@ -7446,7 +7443,7 @@ TEST (rpc, telemetry_all) nano::telemetry_data data; auto const should_ignore_identification_metrics = false; ASSERT_FALSE (data.deserialize_json (config, should_ignore_identification_metrics)); - nano::test::compare_default_telemetry_response_data (data, node->network_params, node->config.bandwidth_limit, node->default_difficulty (nano::work_version::work_1), node->node_id); + ASSERT_TRUE (nano::test::compare_telemetry (data, *node)); ASSERT_EQ (node->network.endpoint ().address ().to_string (), metrics.get ("address")); ASSERT_EQ (node->network.endpoint ().port (), metrics.get ("port")); @@ -7473,7 +7470,7 @@ TEST (rpc, telemetry_self) nano::telemetry_data data; nano::jsonconfig config (response); ASSERT_FALSE (data.deserialize_json (config, should_ignore_identification_metrics)); - nano::test::compare_default_telemetry_response_data (data, node1->network_params, node1->config.bandwidth_limit, node1->default_difficulty (nano::work_version::work_1), node1->node_id); + ASSERT_TRUE (nano::test::compare_telemetry (data, *node1)); } request.put ("address", "[::1]"); @@ -7482,7 +7479,7 @@ TEST (rpc, telemetry_self) nano::telemetry_data data; nano::jsonconfig config (response); ASSERT_FALSE (data.deserialize_json (config, should_ignore_identification_metrics)); - nano::test::compare_default_telemetry_response_data (data, node1->network_params, node1->config.bandwidth_limit, node1->default_difficulty (nano::work_version::work_1), node1->node_id); + ASSERT_TRUE (nano::test::compare_telemetry (data, *node1)); } request.put ("address", "127.0.0.1"); @@ -7491,7 +7488,7 @@ TEST (rpc, telemetry_self) nano::telemetry_data data; nano::jsonconfig config (response); ASSERT_FALSE (data.deserialize_json (config, should_ignore_identification_metrics)); - nano::test::compare_default_telemetry_response_data (data, node1->network_params, node1->config.bandwidth_limit, node1->default_difficulty (nano::work_version::work_1), node1->node_id); + ASSERT_TRUE (nano::test::compare_telemetry (data, *node1)); } // Incorrect port should fail diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 057906c6..04c8293d 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1400,14 +1400,13 @@ TEST (telemetry, ongoing_requests) { nano::test::system system; nano::node_flags node_flags; - node_flags.disable_initial_telemetry_requests = true; auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); nano::test::wait_peer_connections (system); - ASSERT_EQ (0, node_client->telemetry->telemetry_data_size ()); - ASSERT_EQ (0, node_server->telemetry->telemetry_data_size ()); + ASSERT_EQ (0, node_client->telemetry.size ()); + ASSERT_EQ (0, node_server->telemetry.size ()); ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::out)); @@ -1415,7 +1414,7 @@ TEST (telemetry, ongoing_requests) // Wait till the next ongoing will be called, and add a 1s buffer for the actual processing auto time = std::chrono::steady_clock::now (); - ASSERT_TIMELY (10s, std::chrono::steady_clock::now () >= (time + node_client->telemetry->cache_plus_buffer_cutoff_time () + 1s)); + ASSERT_TIMELY (10s, std::chrono::steady_clock::now () >= (time + nano::dev::network_params.network.telemetry_cache_cutoff + 1s)); ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); @@ -1433,7 +1432,6 @@ namespace transport { nano::test::system system; nano::node_flags node_flags; - node_flags.disable_initial_telemetry_requests = true; auto const num_nodes = 4; for (int i = 0; i < num_nodes; ++i) { @@ -1469,10 +1467,12 @@ namespace transport // Pick first peer to be consistent auto peer = data.node->network.tcp_channels.channels[0].channel; - data.node->telemetry->get_metrics_single_peer_async (peer, [&shared_data, &data, &node_data] (nano::telemetry_data_response const & telemetry_data_response_a) { - ASSERT_FALSE (telemetry_data_response_a.error); - callback_process (shared_data, data, node_data, telemetry_data_response_a.telemetry_data.timestamp); - }); + + auto maybe_telemetry = data.node->telemetry.get_telemetry (peer->get_endpoint ()); + if (maybe_telemetry) + { + callback_process (shared_data, data, node_data, maybe_telemetry->timestamp); + } } std::this_thread::sleep_for (1ms); } @@ -1501,7 +1501,6 @@ TEST (telemetry, under_load) nano::node_config node_config (nano::test::get_available_port (), system.logging); node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; nano::node_flags node_flags; - node_flags.disable_initial_telemetry_requests = true; auto node = system.add_node (node_config, node_flags); node_config.peering_port = nano::test::get_available_port (); auto node1 = system.add_node (node_config, node_flags); @@ -1589,25 +1588,20 @@ TEST (telemetry, cache_read_and_timeout) nano::test::system system; nano::node_flags node_flags; node_flags.disable_ongoing_telemetry_requests = true; - node_flags.disable_initial_telemetry_requests = true; auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); nano::test::wait_peer_connections (system); // Request telemetry metrics - nano::telemetry_data telemetry_data; - { - std::atomic done{ false }; - auto channel = node_client->network.find_node_id (node_server->get_node_id ()); - node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data] (nano::telemetry_data_response const & response_a) { - telemetry_data = response_a.telemetry_data; - done = true; - }); - ASSERT_TIMELY (10s, done); - } + std::optional telemetry_data; + auto channel = node_client->network.find_node_id (node_server->get_node_id ()); + ASSERT_NE (channel, nullptr); - auto responses = node_client->telemetry->get_metrics (); + node_client->telemetry.trigger (); + ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_endpoint ())); + + auto responses = node_client->telemetry.get_all_telemetries (); ASSERT_TRUE (!responses.empty ()); ASSERT_EQ (telemetry_data, responses.begin ()->second); @@ -1620,24 +1614,17 @@ TEST (telemetry, cache_read_and_timeout) ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); // wait until the telemetry data times out - ASSERT_TIMELY (node_server->telemetry->cache_plus_buffer_cutoff_time (), node_client->telemetry->get_metrics ().empty ()); + ASSERT_TIMELY (5s, node_client->telemetry.get_all_telemetries ().empty ()); // the telemetry data cache should be empty now - responses = node_client->telemetry->get_metrics (); + responses = node_client->telemetry.get_all_telemetries (); ASSERT_TRUE (responses.empty ()); // Request telemetry metrics again - { - std::atomic done{ false }; - auto channel = node_client->network.find_node_id (node_server->get_node_id ()); - node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data] (nano::telemetry_data_response const & response_a) { - telemetry_data = response_a.telemetry_data; - done = true; - }); - ASSERT_TIMELY (10s, done); - } + node_client->telemetry.trigger (); + ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (channel->get_endpoint ())); - responses = node_client->telemetry->get_metrics (); + responses = node_client->telemetry.get_all_telemetries (); ASSERT_TRUE (!responses.empty ()); ASSERT_EQ (telemetry_data, responses.begin ()->second); @@ -1653,8 +1640,6 @@ TEST (telemetry, many_nodes) { nano::test::system system; nano::node_flags node_flags; - node_flags.disable_ongoing_telemetry_requests = true; - node_flags.disable_initial_telemetry_requests = true; node_flags.disable_request_loop = true; // The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number. auto const num_nodes = nano::memory_intensive_instrumentation () ? 4 : 10; @@ -1706,27 +1691,17 @@ TEST (telemetry, many_nodes) // This is the node which will request metrics from all other nodes auto node_client = system.nodes.front (); - nano::mutex mutex; std::vector telemetry_datas; auto peers = node_client->network.list (num_nodes - 1); ASSERT_EQ (peers.size (), num_nodes - 1); for (auto const & peer : peers) { - node_client->telemetry->get_metrics_single_peer_async (peer, [&telemetry_datas, &mutex] (nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - nano::lock_guard guard{ mutex }; - telemetry_datas.push_back (response_a.telemetry_data); - }); + std::optional telemetry_data; + ASSERT_TIMELY (5s, telemetry_data = node_client->telemetry.get_telemetry (peer->get_endpoint ())); + telemetry_datas.push_back (*telemetry_data); } - system.deadline_set (20s); - nano::unique_lock lk{ mutex }; - while (telemetry_datas.size () != num_nodes - 1) - { - lk.unlock (); - ASSERT_NO_ERROR (system.poll ()); - lk.lock (); - } + ASSERT_EQ (telemetry_datas.size (), num_nodes - 1); // Check the metrics for (auto & data : telemetry_datas) @@ -2153,7 +2128,7 @@ TEST (system, block_sequence) system.deadline_set (3600s); nano::node_config config; config.peering_port = nano::test::get_available_port (); - //config.bandwidth_limit = 16 * 1024; + // config.bandwidth_limit = 16 * 1024; config.enable_voting = true; config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; nano::node_flags flags; @@ -2176,7 +2151,7 @@ TEST (system, block_sequence) std::cerr << rep.pub.to_account () << ' ' << pr->wallets.items.begin ()->second->exists (rep.pub) << pr->weight (rep.pub) << ' ' << '\n'; } while (std::any_of (system.nodes.begin (), system.nodes.end (), [] (std::shared_ptr const & node) { - //std::cerr << node->rep_crawler.representative_count () << ' '; + // std::cerr << node->rep_crawler.representative_count () << ' '; return node->rep_crawler.representative_count () < 3; })) { diff --git a/nano/test_common/telemetry.cpp b/nano/test_common/telemetry.cpp index 77cd32c7..db2465e0 100644 --- a/nano/test_common/telemetry.cpp +++ b/nano/test_common/telemetry.cpp @@ -1,38 +1,64 @@ #include #include +#include #include #include -void nano::test::compare_default_telemetry_response_data_excluding_signature (nano::telemetry_data const & telemetry_data_a, nano::network_params const & network_params_a, uint64_t bandwidth_limit_a, uint64_t active_difficulty_a) +namespace { - ASSERT_EQ (telemetry_data_a.block_count, 1); - ASSERT_EQ (telemetry_data_a.cemented_count, 1); - ASSERT_EQ (telemetry_data_a.bandwidth_cap, bandwidth_limit_a); - ASSERT_EQ (telemetry_data_a.peer_count, 1); - ASSERT_EQ (telemetry_data_a.protocol_version, network_params_a.network.protocol_version); - ASSERT_EQ (telemetry_data_a.unchecked_count, 0); - ASSERT_EQ (telemetry_data_a.account_count, 1); - ASSERT_LT (telemetry_data_a.uptime, 100); - ASSERT_EQ (telemetry_data_a.genesis_block, network_params_a.ledger.genesis->hash ()); - ASSERT_EQ (telemetry_data_a.major_version, nano::get_major_node_version ()); - ASSERT_EQ (telemetry_data_a.minor_version, nano::get_minor_node_version ()); - ASSERT_EQ (telemetry_data_a.patch_version, nano::get_patch_node_version ()); - ASSERT_EQ (telemetry_data_a.pre_release_version, nano::get_pre_release_node_version ()); - ASSERT_EQ (telemetry_data_a.maker, static_cast> (nano::telemetry_maker::nf_node)); - ASSERT_GT (telemetry_data_a.timestamp, std::chrono::system_clock::now () - std::chrono::seconds (100)); - ASSERT_EQ (telemetry_data_a.active_difficulty, active_difficulty_a); - ASSERT_EQ (telemetry_data_a.unknown_data, std::vector{}); +void compare_telemetry_data_impl (const nano::telemetry_data & data_a, const nano::telemetry_data & data_b, bool & result) +{ + ASSERT_EQ (data_a.block_count, data_b.block_count); + ASSERT_EQ (data_a.cemented_count, data_b.cemented_count); + ASSERT_EQ (data_a.bandwidth_cap, data_b.bandwidth_cap); + ASSERT_EQ (data_a.peer_count, data_b.peer_count); + ASSERT_EQ (data_a.protocol_version, data_b.protocol_version); + ASSERT_EQ (data_a.unchecked_count, data_b.unchecked_count); + ASSERT_EQ (data_a.account_count, data_b.account_count); + ASSERT_LE (data_a.uptime, data_b.uptime); + ASSERT_EQ (data_a.genesis_block, data_b.genesis_block); + ASSERT_EQ (data_a.major_version, nano::get_major_node_version ()); + ASSERT_EQ (data_a.minor_version, nano::get_minor_node_version ()); + ASSERT_EQ (data_a.patch_version, nano::get_patch_node_version ()); + ASSERT_EQ (data_a.pre_release_version, nano::get_pre_release_node_version ()); + ASSERT_EQ (data_a.maker, static_cast> (nano::telemetry_maker::nf_node)); + ASSERT_GT (data_a.timestamp, std::chrono::system_clock::now () - std::chrono::seconds (100)); + ASSERT_EQ (data_a.active_difficulty, data_b.active_difficulty); + ASSERT_EQ (data_a.unknown_data, std::vector{}); + result = true; +} } -void nano::test::compare_default_telemetry_response_data (nano::telemetry_data const & telemetry_data_a, nano::network_params const & network_params_a, uint64_t bandwidth_limit_a, uint64_t active_difficulty_a, nano::keypair const & node_id_a) +bool nano::test::compare_telemetry_data (const nano::telemetry_data & data_a, const nano::telemetry_data & data_b) { - ASSERT_FALSE (telemetry_data_a.validate_signature ()); - nano::telemetry_data telemetry_data_l = telemetry_data_a; - telemetry_data_l.signature.clear (); - telemetry_data_l.sign (node_id_a); - // Signature should be different because uptime/timestamp will have changed. - ASSERT_NE (telemetry_data_a.signature, telemetry_data_l.signature); - nano::test::compare_default_telemetry_response_data_excluding_signature (telemetry_data_a, network_params_a, bandwidth_limit_a, active_difficulty_a); - ASSERT_EQ (telemetry_data_a.node_id, node_id_a.pub); + bool result = false; + compare_telemetry_data_impl (data_a, data_b, result); + return result; } + +namespace +{ +void compare_telemetry_impl (const nano::telemetry_data & data, nano::node const & node, bool & result) +{ + ASSERT_FALSE (data.validate_signature ()); + ASSERT_EQ (data.node_id, node.node_id.pub); + + // Signature should be different because uptime/timestamp will have changed. + nano::telemetry_data data_l = data; + data_l.signature.clear (); + data_l.sign (node.node_id); + ASSERT_NE (data.signature, data_l.signature); + + ASSERT_TRUE (nano::test::compare_telemetry_data (data, node.local_telemetry ())); + + result = true; +} +} + +bool nano::test::compare_telemetry (const nano::telemetry_data & data, const nano::node & node) +{ + bool result = false; + compare_telemetry_impl (data, node, result); + return result; +} \ No newline at end of file diff --git a/nano/test_common/telemetry.hpp b/nano/test_common/telemetry.hpp index 74ec4822..e6a3dfd4 100644 --- a/nano/test_common/telemetry.hpp +++ b/nano/test_common/telemetry.hpp @@ -4,13 +4,21 @@ namespace nano { -class keypair; -class network_params; +class node; class telemetry_data; - -namespace test -{ - void compare_default_telemetry_response_data_excluding_signature (nano::telemetry_data const & telemetry_data_a, nano::network_params const & network_params_a, uint64_t bandwidth_limit_a, uint64_t active_difficulty_a); - void compare_default_telemetry_response_data (nano::telemetry_data const & telemetry_data_a, nano::network_params const & network_params_a, uint64_t bandwidth_limit_a, uint64_t active_difficulty_a, nano::keypair const & node_id_a); } + +namespace nano::test +{ +/** + * Compares telemetry data without signatures + * @return true if comparison OK + */ +bool compare_telemetry_data (nano::telemetry_data const &, nano::telemetry_data const &); + +/** + * Compares telemetry data and checks signature matches node_id + * @return true if comparison OK + */ +bool compare_telemetry (nano::telemetry_data const &, nano::node const &); } \ No newline at end of file diff --git a/nano/test_common/testutil.cpp b/nano/test_common/testutil.cpp index 57f67646..d932073b 100644 --- a/nano/test_common/testutil.cpp +++ b/nano/test_common/testutil.cpp @@ -210,7 +210,12 @@ std::vector nano::test::blocks_to_hashes (std::vector nano::test::fake_channel (nano::node & node) +std::shared_ptr nano::test::fake_channel (nano::node & node, nano::account node_id) { - return std::make_shared (node); + auto channel = std::make_shared (node); + if (!node_id.is_zero ()) + { + channel->set_node_id (node_id); + } + return channel; } \ No newline at end of file diff --git a/nano/test_common/testutil.hpp b/nano/test_common/testutil.hpp index db952371..e82a5ca8 100644 --- a/nano/test_common/testutil.hpp +++ b/nano/test_common/testutil.hpp @@ -405,6 +405,6 @@ namespace test /* * Creates a new fake channel associated with `node` */ - std::shared_ptr fake_channel (nano::node & node); + std::shared_ptr fake_channel (nano::node & node, nano::account node_id = { 0 }); } }