Add telemetry response websocket callbacks (#2634)

* Add telemetry websocket callbacks

* Websocket additions

* Remove TODOs
This commit is contained in:
Wesley Shillingford 2020-03-20 10:06:09 +00:00 committed by GitHub
commit da45f15e06
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 161 additions and 58 deletions

View file

@ -8,6 +8,7 @@ add_executable (core_test
block_store.cpp
bootstrap.cpp
cli.cpp
common.hpp
confirmation_height.cpp
confirmation_solicitor.cpp
conflicts.cpp

41
nano/core_test/common.hpp Normal file
View file

@ -0,0 +1,41 @@
#pragma once
#include <nano/core_test/testutil.hpp>
#include <nano/node/testing.hpp>
#include <gtest/gtest.h>
#include <numeric>
using namespace std::chrono_literals;
namespace nano
{
inline void wait_peer_connections (nano::system & system_a)
{
auto wait_peer_count = [&system_a](bool in_memory) {
auto num_nodes = system_a.nodes.size ();
system_a.deadline_set (20s);
auto peer_count = 0;
while (peer_count != num_nodes * (num_nodes - 1))
{
ASSERT_NO_ERROR (system_a.poll ());
peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [in_memory](auto total, auto const & node) {
if (in_memory)
{
return total += node->network.size ();
}
else
{
auto transaction = node->store.tx_begin_read ();
return total += node->store.peer_count (transaction);
}
});
}
};
// Do a pre-pass with in-memory containers to reduce IO if still in the process of connecting to peers
wait_peer_count (true);
wait_peer_count (false);
}
}

View file

@ -1,3 +1,4 @@
#include <nano/core_test/common.hpp>
#include <nano/core_test/testutil.hpp>
#include <nano/node/telemetry.hpp>
#include <nano/node/testing.hpp>
@ -8,11 +9,6 @@
using namespace std::chrono_literals;
namespace
{
void wait_peer_connections (nano::system & system_a);
}
TEST (node_telemetry, consolidate_data)
{
auto time = 1582117035109;
@ -708,34 +704,3 @@ TEST (node_telemetry, disable_metrics)
ASSERT_NO_ERROR (system.poll ());
}
}
namespace
{
void wait_peer_connections (nano::system & system_a)
{
auto wait_peer_count = [&system_a](bool in_memory) {
auto num_nodes = system_a.nodes.size ();
system_a.deadline_set (20s);
auto peer_count = 0;
while (peer_count != num_nodes * (num_nodes - 1))
{
ASSERT_NO_ERROR (system_a.poll ());
peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [in_memory](auto total, auto const & node) {
if (in_memory)
{
return total += node->network.size ();
}
else
{
auto transaction = node->store.tx_begin_read ();
return total += node->store.peer_count (transaction);
}
});
}
};
// Do a pre-pass with in-memory containers to reduce IO if still in the process of connecting to peers
wait_peer_count (true);
wait_peer_count (false);
}
}

View file

@ -1,3 +1,4 @@
#include <nano/core_test/common.hpp>
#include <nano/core_test/fakes/websocket_client.hpp>
#include <nano/core_test/testutil.hpp>
#include <nano/node/testing.hpp>
@ -853,3 +854,62 @@ TEST (websocket, ws_keepalive)
ASSERT_NO_ERROR (system.poll ());
}
}
// Tests sending telemetry
TEST (websocket, telemetry)
{
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
nano::node_flags node_flags;
node_flags.disable_ongoing_telemetry_requests = true;
auto node1 (system.add_node (config, node_flags));
config.peering_port = nano::get_available_port ();
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node2 (system.add_node (config, node_flags));
wait_peer_connections (system);
std::atomic<bool> done{ false };
auto task = ([config = node1->config, &node1, &done]() {
fake_websocket_client client (config.websocket_config.port);
client.send_message (R"json({"action": "subscribe", "topic": "telemetry", "ack": true})json");
client.await_ack ();
done = true;
EXPECT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::telemetry));
return client.get_response ();
});
auto future = std::async (std::launch::async, task);
ASSERT_TIMELY (10s, done);
node1->telemetry->get_metrics_single_peer_async (node1->network.find_channel (node2->network.endpoint ()), [](auto const & response_a) {
ASSERT_FALSE (response_a.error);
});
ASSERT_TIMELY (10s, future.wait_for (0s) == std::future_status::ready);
// Check the telemetry notification message
auto response = future.get ();
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "telemetry");
auto & contents = event.get_child ("message");
nano::jsonconfig telemetry_contents (contents);
nano::telemetry_data telemetry_data;
telemetry_data.deserialize_json (telemetry_contents, false);
compare_default_telemetry_response_data (telemetry_data, node2->network_params, node2->config.bandwidth_limit, node2->node_id);
ASSERT_EQ (contents.get<std::string> ("address"), node2->network.endpoint ().address ().to_string ());
ASSERT_EQ (contents.get<uint16_t> ("port"), node2->network.endpoint ().port ());
// Other node should have no subscribers
EXPECT_EQ (0, node2->websocket_server->subscriber_count (nano::websocket::topic::telemetry));
}

View file

@ -106,7 +106,7 @@ gap_cache (*this),
ledger (store, stats, flags_a.generate_cache),
checker (config.signature_checker_threads),
network (*this, config.peering_port),
telemetry (std::make_shared<nano::telemetry> (network, alarm, worker, flags.disable_ongoing_telemetry_requests)),
telemetry (std::make_shared<nano::telemetry> (network, alarm, worker, observers.telemetry, flags.disable_ongoing_telemetry_requests)),
bootstrap_initiator (*this),
bootstrap (config.peering_port, *this),
application_path (application_path_a),
@ -263,6 +263,14 @@ startup_time (std::chrono::steady_clock::now ())
this->websocket_server->broadcast (msg);
}
});
observers.telemetry.add ([this](nano::telemetry_data const & telemetry_data, nano::endpoint const & endpoint) {
if (this->websocket_server->any_subscriber (nano::websocket::topic::telemetry))
{
nano::websocket::message_builder builder;
this->websocket_server->broadcast (builder.telemetry_received (telemetry_data, endpoint));
}
});
}
// Add block confirmation type stats regardless of http-callback and websocket subscriptions
observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {

View file

@ -7,6 +7,7 @@
namespace nano
{
class telemetry;
class node_observers final
{
public:
@ -20,6 +21,7 @@ public:
nano::observer_set<> disconnect;
nano::observer_set<uint64_t> difficulty;
nano::observer_set<nano::root const &> work_cancel;
nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> telemetry;
};
std::unique_ptr<container_info_component> collect_container_info (node_observers & node_observers, const std::string & name);

View file

@ -15,10 +15,11 @@
using namespace std::chrono_literals;
nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, bool disable_ongoing_requests_a) :
nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> & observers_a, bool disable_ongoing_requests_a) :
network (network_a),
alarm (alarm_a),
worker (worker_a),
observers (observers_a),
disable_ongoing_requests (disable_ongoing_requests_a)
{
}
@ -41,9 +42,10 @@ void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transpor
{
if (!stopped)
{
nano::lock_guard<std::mutex> guard (mutex);
auto it = recent_or_initial_request_telemetry_data.find (channel_a.get_endpoint ());
if (it == recent_or_initial_request_telemetry_data.cend ())
nano::unique_lock<std::mutex> lk (mutex);
nano::endpoint endpoint = channel_a.get_endpoint ();
auto it = recent_or_initial_request_telemetry_data.find (endpoint);
if (it == recent_or_initial_request_telemetry_data.cend () || !it->undergoing_request)
{
// Not requesting telemetry data from this peer so ignore it
return;
@ -60,7 +62,15 @@ void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transpor
error = !message_a.data.validate_signature (message_a.size ()) && (channel_a.get_node_id () != message_a.data.node_id);
}
channel_processed (channel_a.get_endpoint (), error || message_a.is_empty_payload ());
if (!error && !message_a.is_empty_payload ())
{
// Received telemetry data from a peer which hasn't disabled providing telemetry metrics and there no errors with the data
lk.unlock ();
observers.notify (message_a.data, endpoint);
lk.lock ();
}
channel_processed (endpoint, error || message_a.is_empty_payload ());
}
}

View file

@ -1,5 +1,6 @@
#pragma once
#include <nano/lib/utility.hpp>
#include <nano/node/common.hpp>
#include <nano/secure/common.hpp>
@ -58,7 +59,7 @@ public:
class telemetry : public std::enable_shared_from_this<telemetry>
{
public:
telemetry (nano::network &, nano::alarm &, nano::worker &, bool);
telemetry (nano::network &, nano::alarm &, nano::worker &, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> &, bool);
void start ();
void stop ();
@ -103,6 +104,7 @@ private:
nano::network & network;
nano::alarm & alarm;
nano::worker & worker;
nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> & observers;
std::atomic<bool> stopped{ false };
nano::network_params network_params;

View file

@ -389,6 +389,10 @@ nano::websocket::topic to_topic (std::string const & topic_a)
{
topic = nano::websocket::topic::bootstrap;
}
else if (topic_a == "telemetry")
{
topic = nano::websocket::topic::telemetry;
}
return topic;
}
@ -424,6 +428,10 @@ std::string from_topic (nano::websocket::topic topic_a)
{
topic = "bootstrap";
}
else if (topic_a == nano::websocket::topic::telemetry)
{
topic = "telemetry";
}
return topic;
}
}
@ -860,6 +868,21 @@ nano::websocket::message nano::websocket::message_builder::bootstrap_exited (std
return message_l;
}
nano::websocket::message nano::websocket::message_builder::telemetry_received (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a)
{
nano::websocket::message message_l (nano::websocket::topic::telemetry);
set_common_fields (message_l);
// Telemetry information
nano::jsonconfig telemetry_l;
telemetry_data_a.serialize_json (telemetry_l, false);
telemetry_l.put ("address", endpoint_a.address ());
telemetry_l.put ("port", endpoint_a.port ());
message_l.contents.add_child ("message", telemetry_l.get_tree ());
return message_l;
}
void nano::websocket::message_builder::set_common_fields (nano::websocket::message & message_a)
{
using namespace std::chrono;

View file

@ -6,6 +6,7 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/work.hpp>
#include <nano/node/common.hpp>
#include <nano/secure/common.hpp>
#include <boost/property_tree/json_parser.hpp>
@ -33,6 +34,7 @@ class wallets;
class logger_mt;
class vote;
class election_status;
class telemetry_data;
enum class election_status_type : uint8_t;
namespace websocket
{
@ -58,6 +60,8 @@ namespace websocket
work,
/** A bootstrap message */
bootstrap,
/** A telemetry message */
telemetry,
/** Auxiliary length, not a valid topic, must be the last enum */
_length
};
@ -95,6 +99,7 @@ namespace websocket
message work_failed (nano::work_version const version_a, nano::block_hash const & root_a, uint64_t const difficulty_a, uint64_t const publish_threshold_a, std::chrono::milliseconds const & duration_a, std::vector<std::string> const & bad_peers_a);
message bootstrap_started (std::string const & id_a, std::string const & mode_a);
message bootstrap_exited (std::string const & id_a, std::string const & mode_a, std::chrono::steady_clock::time_point const start_time_a, uint64_t const total_blocks_a);
message telemetry_received (nano::telemetry_data const &, nano::endpoint const &);
private:
/** Set the common fields for messages: timestamp and topic. */

View file

@ -1,3 +1,4 @@
#include <nano/core_test/common.hpp>
#include <nano/core_test/testutil.hpp>
#include <nano/crypto_lib/random_pool.hpp>
#include <nano/lib/threading.hpp>
@ -832,21 +833,6 @@ TEST (confirmation_height, prioritize_frontiers_overwrite)
namespace
{
void wait_peer_connections (nano::system & system_a)
{
system_a.deadline_set (10s);
auto peer_count = 0;
auto num_nodes = system_a.nodes.size ();
while (peer_count != num_nodes * (num_nodes - 1))
{
ASSERT_NO_ERROR (system_a.poll ());
peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [](auto total, auto const & node) {
auto transaction = node->store.tx_begin_read ();
return total += node->store.peer_count (transaction);
});
}
}
class data
{
public: