From da45f15e0683c4655eeac6c1561d91e9fa3b26e2 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Fri, 20 Mar 2020 10:06:09 +0000 Subject: [PATCH] Add telemetry response websocket callbacks (#2634) * Add telemetry websocket callbacks * Websocket additions * Remove TODOs --- nano/core_test/CMakeLists.txt | 1 + nano/core_test/common.hpp | 41 +++++++++++++++++++++ nano/core_test/node_telemetry.cpp | 37 +------------------ nano/core_test/websocket.cpp | 60 +++++++++++++++++++++++++++++++ nano/node/node.cpp | 10 +++++- nano/node/node_observers.hpp | 2 ++ nano/node/telemetry.cpp | 20 ++++++++--- nano/node/telemetry.hpp | 4 ++- nano/node/websocket.cpp | 23 ++++++++++++ nano/node/websocket.hpp | 5 +++ nano/slow_test/node.cpp | 16 +-------- 11 files changed, 161 insertions(+), 58 deletions(-) create mode 100644 nano/core_test/common.hpp diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 06aac192..d32a4964 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -8,6 +8,7 @@ add_executable (core_test block_store.cpp bootstrap.cpp cli.cpp + common.hpp confirmation_height.cpp confirmation_solicitor.cpp conflicts.cpp diff --git a/nano/core_test/common.hpp b/nano/core_test/common.hpp new file mode 100644 index 00000000..c9a25cda --- /dev/null +++ b/nano/core_test/common.hpp @@ -0,0 +1,41 @@ +#pragma once + +#include +#include + +#include + +#include + +using namespace std::chrono_literals; + +namespace nano +{ +inline void wait_peer_connections (nano::system & system_a) +{ + auto wait_peer_count = [&system_a](bool in_memory) { + auto num_nodes = system_a.nodes.size (); + system_a.deadline_set (20s); + auto peer_count = 0; + while (peer_count != num_nodes * (num_nodes - 1)) + { + ASSERT_NO_ERROR (system_a.poll ()); + peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [in_memory](auto total, auto const & node) { + if (in_memory) + { + return total += node->network.size (); + } + else + { + auto transaction = node->store.tx_begin_read (); + return total += node->store.peer_count (transaction); + } + }); + } + }; + + // Do a pre-pass with in-memory containers to reduce IO if still in the process of connecting to peers + wait_peer_count (true); + wait_peer_count (false); +} +} diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp index 4e5194aa..f9e05320 100644 --- a/nano/core_test/node_telemetry.cpp +++ b/nano/core_test/node_telemetry.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -8,11 +9,6 @@ using namespace std::chrono_literals; -namespace -{ -void wait_peer_connections (nano::system & system_a); -} - TEST (node_telemetry, consolidate_data) { auto time = 1582117035109; @@ -708,34 +704,3 @@ TEST (node_telemetry, disable_metrics) ASSERT_NO_ERROR (system.poll ()); } } - -namespace -{ -void wait_peer_connections (nano::system & system_a) -{ - auto wait_peer_count = [&system_a](bool in_memory) { - auto num_nodes = system_a.nodes.size (); - system_a.deadline_set (20s); - auto peer_count = 0; - while (peer_count != num_nodes * (num_nodes - 1)) - { - ASSERT_NO_ERROR (system_a.poll ()); - peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [in_memory](auto total, auto const & node) { - if (in_memory) - { - return total += node->network.size (); - } - else - { - auto transaction = node->store.tx_begin_read (); - return total += node->store.peer_count (transaction); - } - }); - } - }; - - // Do a pre-pass with in-memory containers to reduce IO if still in the process of connecting to peers - wait_peer_count (true); - wait_peer_count (false); -} -} diff --git a/nano/core_test/websocket.cpp b/nano/core_test/websocket.cpp index c4f31322..47eb2e66 100644 --- a/nano/core_test/websocket.cpp +++ b/nano/core_test/websocket.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -853,3 +854,62 @@ TEST (websocket, ws_keepalive) ASSERT_NO_ERROR (system.poll ()); } } + +// Tests sending telemetry +TEST (websocket, telemetry) +{ + nano::system system; + nano::node_config config (nano::get_available_port (), system.logging); + config.websocket_config.enabled = true; + config.websocket_config.port = nano::get_available_port (); + nano::node_flags node_flags; + node_flags.disable_ongoing_telemetry_requests = true; + auto node1 (system.add_node (config, node_flags)); + config.peering_port = nano::get_available_port (); + config.websocket_config.enabled = true; + config.websocket_config.port = nano::get_available_port (); + auto node2 (system.add_node (config, node_flags)); + + wait_peer_connections (system); + + std::atomic done{ false }; + auto task = ([config = node1->config, &node1, &done]() { + fake_websocket_client client (config.websocket_config.port); + client.send_message (R"json({"action": "subscribe", "topic": "telemetry", "ack": true})json"); + client.await_ack (); + done = true; + EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::telemetry)); + return client.get_response (); + }); + + auto future = std::async (std::launch::async, task); + + ASSERT_TIMELY (10s, done); + + node1->telemetry->get_metrics_single_peer_async (node1->network.find_channel (node2->network.endpoint ()), [](auto const & response_a) { + ASSERT_FALSE (response_a.error); + }); + + ASSERT_TIMELY (10s, future.wait_for (0s) == std::future_status::ready); + + // Check the telemetry notification message + auto response = future.get (); + + std::stringstream stream; + stream << response; + boost::property_tree::ptree event; + boost::property_tree::read_json (stream, event); + ASSERT_EQ (event.get ("topic"), "telemetry"); + + auto & contents = event.get_child ("message"); + nano::jsonconfig telemetry_contents (contents); + nano::telemetry_data telemetry_data; + telemetry_data.deserialize_json (telemetry_contents, false); + compare_default_telemetry_response_data (telemetry_data, node2->network_params, node2->config.bandwidth_limit, node2->node_id); + + ASSERT_EQ (contents.get ("address"), node2->network.endpoint ().address ().to_string ()); + ASSERT_EQ (contents.get ("port"), node2->network.endpoint ().port ()); + + // Other node should have no subscribers + EXPECT_EQ (0, node2->websocket_server->subscriber_count (nano::websocket::topic::telemetry)); +} diff --git a/nano/node/node.cpp b/nano/node/node.cpp index a411bb73..b2175f1f 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -106,7 +106,7 @@ gap_cache (*this), ledger (store, stats, flags_a.generate_cache), checker (config.signature_checker_threads), network (*this, config.peering_port), -telemetry (std::make_shared (network, alarm, worker, flags.disable_ongoing_telemetry_requests)), +telemetry (std::make_shared (network, alarm, worker, observers.telemetry, flags.disable_ongoing_telemetry_requests)), bootstrap_initiator (*this), bootstrap (config.peering_port, *this), application_path (application_path_a), @@ -263,6 +263,14 @@ startup_time (std::chrono::steady_clock::now ()) this->websocket_server->broadcast (msg); } }); + + observers.telemetry.add ([this](nano::telemetry_data const & telemetry_data, nano::endpoint const & endpoint) { + if (this->websocket_server->any_subscriber (nano::websocket::topic::telemetry)) + { + nano::websocket::message_builder builder; + this->websocket_server->broadcast (builder.telemetry_received (telemetry_data, endpoint)); + } + }); } // Add block confirmation type stats regardless of http-callback and websocket subscriptions observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) { diff --git a/nano/node/node_observers.hpp b/nano/node/node_observers.hpp index 1fbf576f..50f16d05 100644 --- a/nano/node/node_observers.hpp +++ b/nano/node/node_observers.hpp @@ -7,6 +7,7 @@ namespace nano { +class telemetry; class node_observers final { public: @@ -20,6 +21,7 @@ public: nano::observer_set<> disconnect; nano::observer_set difficulty; nano::observer_set work_cancel; + nano::observer_set telemetry; }; std::unique_ptr collect_container_info (node_observers & node_observers, const std::string & name); diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index 01d28cb9..22e4c94e 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -15,10 +15,11 @@ using namespace std::chrono_literals; -nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, bool disable_ongoing_requests_a) : +nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, nano::observer_set & observers_a, bool disable_ongoing_requests_a) : network (network_a), alarm (alarm_a), worker (worker_a), +observers (observers_a), disable_ongoing_requests (disable_ongoing_requests_a) { } @@ -41,9 +42,10 @@ void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transpor { if (!stopped) { - nano::lock_guard guard (mutex); - auto it = recent_or_initial_request_telemetry_data.find (channel_a.get_endpoint ()); - if (it == recent_or_initial_request_telemetry_data.cend ()) + nano::unique_lock lk (mutex); + nano::endpoint endpoint = channel_a.get_endpoint (); + auto it = recent_or_initial_request_telemetry_data.find (endpoint); + if (it == recent_or_initial_request_telemetry_data.cend () || !it->undergoing_request) { // Not requesting telemetry data from this peer so ignore it return; @@ -60,7 +62,15 @@ void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transpor error = !message_a.data.validate_signature (message_a.size ()) && (channel_a.get_node_id () != message_a.data.node_id); } - channel_processed (channel_a.get_endpoint (), error || message_a.is_empty_payload ()); + if (!error && !message_a.is_empty_payload ()) + { + // Received telemetry data from a peer which hasn't disabled providing telemetry metrics and there no errors with the data + lk.unlock (); + observers.notify (message_a.data, endpoint); + lk.lock (); + } + + channel_processed (endpoint, error || message_a.is_empty_payload ()); } } diff --git a/nano/node/telemetry.hpp b/nano/node/telemetry.hpp index 861e817a..17084cda 100644 --- a/nano/node/telemetry.hpp +++ b/nano/node/telemetry.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -58,7 +59,7 @@ public: class telemetry : public std::enable_shared_from_this { public: - telemetry (nano::network &, nano::alarm &, nano::worker &, bool); + telemetry (nano::network &, nano::alarm &, nano::worker &, nano::observer_set &, bool); void start (); void stop (); @@ -103,6 +104,7 @@ private: nano::network & network; nano::alarm & alarm; nano::worker & worker; + nano::observer_set & observers; std::atomic stopped{ false }; nano::network_params network_params; diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 9e38c572..f8727d51 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -389,6 +389,10 @@ nano::websocket::topic to_topic (std::string const & topic_a) { topic = nano::websocket::topic::bootstrap; } + else if (topic_a == "telemetry") + { + topic = nano::websocket::topic::telemetry; + } return topic; } @@ -424,6 +428,10 @@ std::string from_topic (nano::websocket::topic topic_a) { topic = "bootstrap"; } + else if (topic_a == nano::websocket::topic::telemetry) + { + topic = "telemetry"; + } return topic; } } @@ -860,6 +868,21 @@ nano::websocket::message nano::websocket::message_builder::bootstrap_exited (std return message_l; } +nano::websocket::message nano::websocket::message_builder::telemetry_received (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a) +{ + nano::websocket::message message_l (nano::websocket::topic::telemetry); + set_common_fields (message_l); + + // Telemetry information + nano::jsonconfig telemetry_l; + telemetry_data_a.serialize_json (telemetry_l, false); + telemetry_l.put ("address", endpoint_a.address ()); + telemetry_l.put ("port", endpoint_a.port ()); + + message_l.contents.add_child ("message", telemetry_l.get_tree ()); + return message_l; +} + void nano::websocket::message_builder::set_common_fields (nano::websocket::message & message_a) { using namespace std::chrono; diff --git a/nano/node/websocket.hpp b/nano/node/websocket.hpp index 6b0f7c3f..c5be48c4 100644 --- a/nano/node/websocket.hpp +++ b/nano/node/websocket.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -33,6 +34,7 @@ class wallets; class logger_mt; class vote; class election_status; +class telemetry_data; enum class election_status_type : uint8_t; namespace websocket { @@ -58,6 +60,8 @@ namespace websocket work, /** A bootstrap message */ bootstrap, + /** A telemetry message */ + telemetry, /** Auxiliary length, not a valid topic, must be the last enum */ _length }; @@ -95,6 +99,7 @@ namespace websocket message work_failed (nano::work_version const version_a, nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector const & bad_peers_a); message bootstrap_started (std::string const & id_a, std::string const & mode_a); message bootstrap_exited (std::string const & id_a, std::string const & mode_a, std::chrono::steady_clock::time_point const start_time_a, uint64_t const total_blocks_a); + message telemetry_received (nano::telemetry_data const &, nano::endpoint const &); private: /** Set the common fields for messages: timestamp and topic. */ diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 5e4fff38..ca66dba6 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -832,21 +833,6 @@ TEST (confirmation_height, prioritize_frontiers_overwrite) namespace { -void wait_peer_connections (nano::system & system_a) -{ - system_a.deadline_set (10s); - auto peer_count = 0; - auto num_nodes = system_a.nodes.size (); - while (peer_count != num_nodes * (num_nodes - 1)) - { - ASSERT_NO_ERROR (system_a.poll ()); - peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [](auto total, auto const & node) { - auto transaction = node->store.tx_begin_read (); - return total += node->store.peer_count (transaction); - }); - } -} - class data { public: