Node telemetry (#2446)
* Node telemetry * Add genesis block * Don't checkif at the end of the stream with UDP to support compatibility with different message versions * Add single request and optional blocking * Formatting * Use nano locks * Fix clang build * Out of date comments * Update some names of classes/functions/variables * Fix for disconnected peer channels & some cleanup * Remove unnecessary lock requirement in active_transactions::lock. (#2475) * Remove unnecessarry lock requirement in active_transactions::lock. This also removes some code-smell around passing in a bool to determine if the lock should actually be acquired. * Using a timeout counter in line with other test idioms. * Confirmation solicitor revamp (#2472) * Optional minimum version when querying representatives from crawler * Revamping confirmation_solicitor to mimick previous active_transactions behavior * Use a time-based approach to throttle confirmation requests and block flooding * Addressing Wesley review * Remove unusued node.hpp include (thanks wes) * Simplify logic by using unordered_map::operator[] which calls the default constructor if not found * Split solicitor add into broadcast+add and bring back the logic to active_transactions This brings back rate-limitting logic and modifying election variables to active_transactions only. Timings are also slightly adjusted: - Only 2 requests required before starting to flood blocks - Timings for test network * Rename flag * Only broadcast OR request confirmation in the same loop for the same election * Enclose lambda in clang-format off * Beta network reset #2 (#2476) * Delete rep_weights_beta.bin * New genesis * Change header_magic_number to one never used before * use v1.1 for actions-aws-cli (#2486) * The start of CLI tests (#2403) * The start of CLI tests * Add needed header file after merge * Serg review comments * Websocket bootstrap subscription (#2471) * Bootstrap attempt ID * Websocket bootstrap subscription * Change processed blocks factor for requeued pulls (#2437) From `processed blocks / 10000` to `processed blocks / 4096` for better processing of largest chains in case of failures * Upgrade confirmation height table to include cemented frontier (#2481) * Upgrade confirmation height table to include cemented frontier * Stein review comments * Add //clang-format block around lambda * Update comment (thanks Gui) * Upgrade to v17 instead * Update comments * Request aggregator (#2485) * Request aggregator Adds a class that runs in a new thread to pool confirmation requests by endpoint. This allows a reduction of bandwidth, vote generation, and moves some vote generation out of the I/O threads. * Use a constant for confirm_ack_hashes_max * Small code simplification * Use const transaction * Disable clang-format for lambdas * Add missing deadlines and update deadline before poll block * Use a scoped lock_guard pattern and initialize start in-class * Misc. fixes and documentation * use clang-format-8 (#2490) There is some funkiness with clang-format-9 and lambdas that are not honored * Formatting fix so clangformat 8 applies cleanly (#2491) * Support multiple work peers in the same host (#2477) * Correct check for peers when creating work * Support multiple work peers in the same address * Send cancels despite errors to account for non-conforming implementations * Add tests using a fake work peer, acting as good, malicious or slow * Comment * Formatting * Fix multiple response error in RPC * Formatting * Add extra error handling when specifying port and not address with the RPC (and vice versa) * Formatting * Serg comments * Formatting * Mask not needed * Missed file check in for mask removal * Fix test failures * Gui suggestions * Formatting * Fix assert with test on Windows and some cleanup * Lower number of nodes in node_telemetry.many_nodes when using sanitizers Co-authored-by: clemahieu <clemahieu@gmail.com> Co-authored-by: Guilherme Lawless <guilherme.lawless@gmail.com> Co-authored-by: Russel Waters <vaelstrom@gmail.com> Co-authored-by: Sergey Kroshnin <sergiysw@gmail.com> Co-authored-by: cryptocode <stein@nano.org>
This commit is contained in:
parent
48d4c4a056
commit
1edb8c6c93
35 changed files with 2173 additions and 31 deletions
|
@ -19,11 +19,12 @@ add_executable (core_test
|
|||
ledger.cpp
|
||||
locks.cpp
|
||||
logger.cpp
|
||||
network.cpp
|
||||
node.cpp
|
||||
message.cpp
|
||||
message_parser.cpp
|
||||
memory_pool.cpp
|
||||
network.cpp
|
||||
node.cpp
|
||||
node_telemetry.cpp
|
||||
processor_service.cpp
|
||||
peer_container.cpp
|
||||
request_aggregator.cpp
|
||||
|
|
|
@ -25,33 +25,37 @@ public:
|
|||
}
|
||||
void bulk_pull (nano::bulk_pull const &) override
|
||||
{
|
||||
++bulk_pull_count;
|
||||
ASSERT_FALSE (true);
|
||||
}
|
||||
void bulk_pull_account (nano::bulk_pull_account const &) override
|
||||
{
|
||||
++bulk_pull_account_count;
|
||||
ASSERT_FALSE (true);
|
||||
}
|
||||
void bulk_push (nano::bulk_push const &) override
|
||||
{
|
||||
++bulk_push_count;
|
||||
ASSERT_FALSE (true);
|
||||
}
|
||||
void frontier_req (nano::frontier_req const &) override
|
||||
{
|
||||
++frontier_req_count;
|
||||
ASSERT_FALSE (true);
|
||||
}
|
||||
void node_id_handshake (nano::node_id_handshake const &) override
|
||||
{
|
||||
++node_id_handshake_count;
|
||||
ASSERT_FALSE (true);
|
||||
}
|
||||
void telemetry_req (nano::telemetry_req const &) override
|
||||
{
|
||||
ASSERT_FALSE (true);
|
||||
}
|
||||
void telemetry_ack (nano::telemetry_ack const &) override
|
||||
{
|
||||
ASSERT_FALSE (true);
|
||||
}
|
||||
|
||||
uint64_t keepalive_count{ 0 };
|
||||
uint64_t publish_count{ 0 };
|
||||
uint64_t confirm_req_count{ 0 };
|
||||
uint64_t confirm_ack_count{ 0 };
|
||||
uint64_t bulk_pull_count{ 0 };
|
||||
uint64_t bulk_pull_account_count{ 0 };
|
||||
uint64_t bulk_push_count{ 0 };
|
||||
uint64_t frontier_req_count{ 0 };
|
||||
uint64_t node_id_handshake_count{ 0 };
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -3433,7 +3433,7 @@ TEST (node, bidirectional_tcp)
|
|||
// Test block confirmation from node 2
|
||||
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
|
||||
confirmed = false;
|
||||
system.deadline_set (10s);
|
||||
system.deadline_set (20s);
|
||||
while (!confirmed)
|
||||
{
|
||||
auto transaction1 (node1->store.tx_begin_read ());
|
||||
|
|
753
nano/core_test/node_telemetry.cpp
Normal file
753
nano/core_test/node_telemetry.cpp
Normal file
|
@ -0,0 +1,753 @@
|
|||
#include <nano/core_test/testutil.hpp>
|
||||
#include <nano/node/telemetry.hpp>
|
||||
#include <nano/node/testing.hpp>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <numeric>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace
|
||||
{
|
||||
void wait_peer_connections (nano::system & system_a);
|
||||
void compare_default_test_result_data (nano::telemetry_data & telemetry_data_a, nano::node const & node_server_a);
|
||||
}
|
||||
|
||||
TEST (node_telemetry, consolidate_data)
|
||||
{
|
||||
nano::telemetry_data data;
|
||||
data.account_count = 2;
|
||||
data.block_count = 1;
|
||||
data.cemented_count = 1;
|
||||
data.vendor_version = 20;
|
||||
data.protocol_version_number = 12;
|
||||
data.peer_count = 2;
|
||||
data.bandwidth_cap = 100;
|
||||
data.unchecked_count = 3;
|
||||
data.uptime = 6;
|
||||
data.genesis_block = nano::block_hash (3);
|
||||
|
||||
nano::telemetry_data data1;
|
||||
data1.account_count = 5;
|
||||
data1.block_count = 7;
|
||||
data1.cemented_count = 4;
|
||||
data1.vendor_version = 10;
|
||||
data1.protocol_version_number = 11;
|
||||
data1.peer_count = 5;
|
||||
data1.bandwidth_cap = 0;
|
||||
data1.unchecked_count = 1;
|
||||
data1.uptime = 10;
|
||||
data1.genesis_block = nano::block_hash (4);
|
||||
|
||||
nano::telemetry_data data2;
|
||||
data2.account_count = 3;
|
||||
data2.block_count = 3;
|
||||
data2.cemented_count = 2;
|
||||
data2.vendor_version = 20;
|
||||
data2.protocol_version_number = 11;
|
||||
data2.peer_count = 4;
|
||||
data2.bandwidth_cap = 0;
|
||||
data2.unchecked_count = 2;
|
||||
data2.uptime = 3;
|
||||
data2.genesis_block = nano::block_hash (4);
|
||||
|
||||
std::vector<nano::telemetry_data> all_data{ data, data1, data2 };
|
||||
|
||||
auto consolidated_telemetry_data = nano::telemetry_data::consolidate (all_data);
|
||||
ASSERT_EQ (consolidated_telemetry_data.account_count, 3);
|
||||
ASSERT_EQ (consolidated_telemetry_data.block_count, 3);
|
||||
ASSERT_EQ (consolidated_telemetry_data.cemented_count, 2);
|
||||
ASSERT_EQ (consolidated_telemetry_data.vendor_version, 20);
|
||||
ASSERT_EQ (consolidated_telemetry_data.protocol_version_number, 11);
|
||||
ASSERT_EQ (consolidated_telemetry_data.peer_count, 3);
|
||||
ASSERT_EQ (consolidated_telemetry_data.bandwidth_cap, 0);
|
||||
ASSERT_EQ (consolidated_telemetry_data.unchecked_count, 2);
|
||||
ASSERT_EQ (consolidated_telemetry_data.uptime, 6);
|
||||
ASSERT_EQ (consolidated_telemetry_data.genesis_block, nano::block_hash (4));
|
||||
|
||||
// Modify the metrics which may be either the mode or averages to ensure all are tested.
|
||||
all_data[2].bandwidth_cap = 53;
|
||||
all_data[2].protocol_version_number = 13;
|
||||
all_data[2].vendor_version = 13;
|
||||
all_data[2].genesis_block = nano::block_hash (3);
|
||||
|
||||
auto consolidated_telemetry_data1 = nano::telemetry_data::consolidate (all_data);
|
||||
ASSERT_TRUE (consolidated_telemetry_data1.vendor_version == 10 || consolidated_telemetry_data1.vendor_version == 13 || consolidated_telemetry_data1.vendor_version == 20);
|
||||
ASSERT_TRUE (consolidated_telemetry_data1.protocol_version_number == 11 || consolidated_telemetry_data1.protocol_version_number == 12 || consolidated_telemetry_data1.protocol_version_number == 13);
|
||||
ASSERT_EQ (consolidated_telemetry_data1.bandwidth_cap, 51);
|
||||
ASSERT_EQ (consolidated_telemetry_data1.genesis_block, nano::block_hash (3));
|
||||
|
||||
// Test equality operator
|
||||
ASSERT_FALSE (consolidated_telemetry_data == consolidated_telemetry_data1);
|
||||
ASSERT_EQ (consolidated_telemetry_data, consolidated_telemetry_data);
|
||||
}
|
||||
|
||||
TEST (node_telemetry, no_peers)
|
||||
{
|
||||
nano::system system (1);
|
||||
|
||||
std::atomic<bool> done{ false };
|
||||
system.nodes[0]->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
|
||||
ASSERT_TRUE (responses_a.data.empty ());
|
||||
ASSERT_FALSE (responses_a.all_received);
|
||||
ASSERT_FALSE (responses_a.is_cached);
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
|
||||
namespace nano
|
||||
{
|
||||
TEST (node_telemetry, basic)
|
||||
{
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
// Request telemetry metrics
|
||||
std::vector<nano::telemetry_data> all_telemetry_data;
|
||||
{
|
||||
std::atomic<bool> done{ false };
|
||||
node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) {
|
||||
ASSERT_FALSE (responses_a.is_cached);
|
||||
ASSERT_TRUE (responses_a.all_received);
|
||||
all_telemetry_data = responses_a.data;
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
|
||||
// Check the metrics are correct
|
||||
ASSERT_EQ (all_telemetry_data.size (), 1);
|
||||
auto & telemetry_data = all_telemetry_data.front ();
|
||||
compare_default_test_result_data (telemetry_data, *node_server);
|
||||
|
||||
// Call again straight away. It should use the cache
|
||||
{
|
||||
std::atomic<bool> done{ false };
|
||||
node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) {
|
||||
ASSERT_EQ (telemetry_data, responses_a.data.front ());
|
||||
ASSERT_TRUE (responses_a.is_cached);
|
||||
ASSERT_TRUE (responses_a.all_received);
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
|
||||
// Wait the cache period and check cache is not used
|
||||
std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff);
|
||||
|
||||
std::atomic<bool> done{ false };
|
||||
node_client->telemetry.get_metrics_random_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) {
|
||||
ASSERT_FALSE (responses_a.is_cached);
|
||||
ASSERT_TRUE (responses_a.all_received);
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST (node_telemetry, many_nodes)
|
||||
{
|
||||
nano::system system;
|
||||
// The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number.
|
||||
const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10;
|
||||
for (auto i = 0; i < num_nodes; ++i)
|
||||
{
|
||||
nano::node_config node_config (nano::get_available_port (), system.logging);
|
||||
// Make a metric completely different for each node so we can get afterwards that there are no duplicates
|
||||
node_config.bandwidth_limit = 100000 + i;
|
||||
system.add_node (node_config);
|
||||
}
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
// Give all nodes a non-default number of blocks
|
||||
nano::keypair key;
|
||||
nano::genesis genesis;
|
||||
nano::state_block send (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ()));
|
||||
for (auto node : system.nodes)
|
||||
{
|
||||
auto transaction (node->store.tx_begin_write ());
|
||||
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code);
|
||||
}
|
||||
|
||||
// This is the node which will request metrics from all other nodes
|
||||
auto node_client = system.nodes.front ();
|
||||
|
||||
std::atomic<bool> done{ false };
|
||||
std::vector<nano::telemetry_data> all_telemetry_data;
|
||||
node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) {
|
||||
ASSERT_FALSE (responses_a.is_cached);
|
||||
ASSERT_TRUE (responses_a.all_received);
|
||||
all_telemetry_data = responses_a.data;
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (20s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
// Check the metrics
|
||||
nano::network_params params;
|
||||
for (auto & data : all_telemetry_data)
|
||||
{
|
||||
ASSERT_EQ (data.unchecked_count, 0);
|
||||
ASSERT_EQ (data.cemented_count, 1);
|
||||
ASSERT_LE (data.peer_count, 9);
|
||||
ASSERT_EQ (data.account_count, 1);
|
||||
ASSERT_TRUE (data.block_count == 2);
|
||||
ASSERT_EQ (data.protocol_version_number, params.protocol.telemetry_protocol_version_min);
|
||||
ASSERT_GE (data.bandwidth_cap, 100000);
|
||||
ASSERT_LT (data.bandwidth_cap, 100000 + system.nodes.size ());
|
||||
ASSERT_EQ (data.vendor_version, nano::get_major_node_version ());
|
||||
ASSERT_LT (data.uptime, 100);
|
||||
ASSERT_EQ (data.genesis_block, genesis.hash ());
|
||||
}
|
||||
|
||||
// We gave some nodes different bandwidth caps, confirm they are not all the time
|
||||
auto all_bandwidth_limits_same = std::all_of (all_telemetry_data.begin () + 1, all_telemetry_data.end (), [bandwidth_cap = all_telemetry_data[0].bandwidth_cap](auto & telemetry) {
|
||||
return telemetry.bandwidth_cap == bandwidth_cap;
|
||||
});
|
||||
ASSERT_FALSE (all_bandwidth_limits_same);
|
||||
}
|
||||
|
||||
TEST (node_telemetry, receive_from_non_listening_channel)
|
||||
{
|
||||
nano::system system;
|
||||
auto node = system.add_node ();
|
||||
nano::telemetry_ack message (nano::telemetry_data{});
|
||||
node->network.process_message (message, node->network.udp_channels.create (node->network.endpoint ()));
|
||||
// We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it.
|
||||
ASSERT_EQ (node->telemetry.telemetry_data_size (), 0);
|
||||
}
|
||||
|
||||
TEST (node_telemetry, over_udp)
|
||||
{
|
||||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_tcp_realtime = true;
|
||||
auto node_client = system.add_node (node_flags);
|
||||
auto node_server = system.add_node (node_flags);
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
std::atomic<bool> done{ false };
|
||||
std::vector<nano::telemetry_data> all_telemetry_data;
|
||||
node_client->telemetry.get_metrics_random_peers_async ([&done, &all_telemetry_data](nano::telemetry_data_responses const & responses_a) {
|
||||
ASSERT_FALSE (responses_a.is_cached);
|
||||
ASSERT_TRUE (responses_a.all_received);
|
||||
all_telemetry_data = responses_a.data;
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
ASSERT_EQ (all_telemetry_data.size (), 1);
|
||||
compare_default_test_result_data (all_telemetry_data.front (), *node_server);
|
||||
|
||||
// Check channels are indeed udp
|
||||
ASSERT_EQ (1, node_client->network.size ());
|
||||
auto list1 (node_client->network.list (2));
|
||||
ASSERT_EQ (node_server->network.endpoint (), list1[0]->get_endpoint ());
|
||||
ASSERT_EQ (nano::transport::transport_type::udp, list1[0]->get_type ());
|
||||
ASSERT_EQ (1, node_server->network.size ());
|
||||
auto list2 (node_server->network.list (2));
|
||||
ASSERT_EQ (node_client->network.endpoint (), list2[0]->get_endpoint ());
|
||||
ASSERT_EQ (nano::transport::transport_type::udp, list2[0]->get_type ());
|
||||
}
|
||||
|
||||
TEST (node_telemetry, simultaneous_random_requests)
|
||||
{
|
||||
const auto num_nodes = 4;
|
||||
nano::system system (num_nodes);
|
||||
|
||||
// Wait until peers are stored as they are done in the background
|
||||
wait_peer_connections (system);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
const auto num_threads = 4;
|
||||
|
||||
std::atomic<bool> done{ false };
|
||||
class Data
|
||||
{
|
||||
public:
|
||||
std::atomic<bool> awaiting_cache{ false };
|
||||
std::atomic<bool> keep_requesting_metrics{ true };
|
||||
std::shared_ptr<nano::node> node;
|
||||
};
|
||||
|
||||
std::array<Data, num_nodes> all_data{};
|
||||
for (auto i = 0; i < num_nodes; ++i)
|
||||
{
|
||||
all_data[i].node = system.nodes[i];
|
||||
}
|
||||
|
||||
std::atomic<uint64_t> count{ 0 };
|
||||
std::promise<void> promise;
|
||||
std::shared_future<void> shared_future (promise.get_future ());
|
||||
|
||||
// Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired.
|
||||
// The test waits until all telemetry_ack messages have been received.
|
||||
for (int i = 0; i < num_threads; ++i)
|
||||
{
|
||||
threads.emplace_back ([&all_data, &done, &count, &promise, &shared_future]() {
|
||||
while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
|
||||
{
|
||||
for (auto & data : all_data)
|
||||
{
|
||||
// Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node
|
||||
if (data.keep_requesting_metrics)
|
||||
{
|
||||
++count;
|
||||
|
||||
data.node->telemetry.get_metrics_random_peers_async ([&promise, &done, &data, &all_data, &count](nano::telemetry_data_responses const & responses_a) {
|
||||
if (data.awaiting_cache && !responses_a.is_cached)
|
||||
{
|
||||
data.keep_requesting_metrics = false;
|
||||
}
|
||||
if (responses_a.is_cached)
|
||||
{
|
||||
data.awaiting_cache = true;
|
||||
}
|
||||
if (--count == 0 && std::all_of (all_data.begin (), all_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; }))
|
||||
{
|
||||
done = true;
|
||||
promise.set_value ();
|
||||
}
|
||||
});
|
||||
}
|
||||
std::this_thread::sleep_for (1ms);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_EQ (count, 0);
|
||||
shared_future.wait ();
|
||||
});
|
||||
}
|
||||
|
||||
system.deadline_set (20s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
thread.join ();
|
||||
}
|
||||
}
|
||||
|
||||
namespace nano
|
||||
{
|
||||
TEST (node_telemetry, single_request)
|
||||
{
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
// Request telemetry metrics
|
||||
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
|
||||
nano::telemetry_data telemetry_data;
|
||||
{
|
||||
std::atomic<bool> done{ false };
|
||||
|
||||
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
|
||||
ASSERT_FALSE (response_a.is_cached);
|
||||
ASSERT_FALSE (response_a.error);
|
||||
telemetry_data = response_a.data;
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
|
||||
// Check the metrics are correct
|
||||
compare_default_test_result_data (telemetry_data, *node_server);
|
||||
|
||||
// Call again straight away. It should use the cache
|
||||
{
|
||||
std::atomic<bool> done{ false };
|
||||
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
|
||||
ASSERT_EQ (telemetry_data, response_a.data);
|
||||
ASSERT_TRUE (response_a.is_cached);
|
||||
ASSERT_FALSE (response_a.error);
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
|
||||
// Wait the cache period and check cache is not used
|
||||
std::this_thread::sleep_for (nano::telemetry_impl::cache_cutoff);
|
||||
|
||||
std::atomic<bool> done{ false };
|
||||
node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) {
|
||||
ASSERT_FALSE (response_a.is_cached);
|
||||
ASSERT_FALSE (response_a.error);
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST (node_telemetry, single_request_invalid_channel)
|
||||
{
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
|
||||
std::atomic<bool> done{ false };
|
||||
node_client->telemetry.get_metrics_single_peer_async (nullptr, [&done](nano::telemetry_data_response const & response_a) {
|
||||
ASSERT_TRUE (response_a.error);
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
|
||||
TEST (node_telemetry, simultaneous_single_and_random_requests)
|
||||
{
|
||||
const auto num_nodes = 4;
|
||||
nano::system system (num_nodes);
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
std::vector<std::thread> threads;
|
||||
const auto num_threads = 4;
|
||||
|
||||
class data
|
||||
{
|
||||
public:
|
||||
std::atomic<bool> awaiting_cache{ false };
|
||||
std::atomic<bool> keep_requesting_metrics{ true };
|
||||
std::shared_ptr<nano::node> node;
|
||||
};
|
||||
|
||||
std::array<data, num_nodes> node_data_single{};
|
||||
std::array<data, num_nodes> node_data_random{};
|
||||
for (auto i = 0; i < num_nodes; ++i)
|
||||
{
|
||||
node_data_single[i].node = system.nodes[i];
|
||||
node_data_random[i].node = system.nodes[i];
|
||||
}
|
||||
|
||||
class shared_data
|
||||
{
|
||||
public:
|
||||
std::atomic<bool> done{ false };
|
||||
std::atomic<uint64_t> count{ 0 };
|
||||
std::promise<void> promise;
|
||||
std::shared_future<void> shared_future{ promise.get_future () };
|
||||
};
|
||||
|
||||
shared_data shared_data_single;
|
||||
shared_data shared_data_random;
|
||||
|
||||
// Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired.
|
||||
// The test waits until all telemetry_ack messages have been received.
|
||||
for (int i = 0; i < num_threads; ++i)
|
||||
{
|
||||
threads.emplace_back ([&node_data_single, &node_data_random, &shared_data_single, &shared_data_random]() {
|
||||
auto func = [](auto & all_node_data_a, shared_data & shared_data_a) {
|
||||
while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); }))
|
||||
{
|
||||
for (auto & data : all_node_data_a)
|
||||
{
|
||||
// Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node
|
||||
if (data.keep_requesting_metrics)
|
||||
{
|
||||
++shared_data_a.count;
|
||||
|
||||
data.node->telemetry.get_metrics_random_peers_async ([& shared_data = shared_data_a, &data, &all_node_data = all_node_data_a](nano::telemetry_data_responses const & responses_a) {
|
||||
if (data.awaiting_cache && !responses_a.is_cached)
|
||||
{
|
||||
data.keep_requesting_metrics = false;
|
||||
}
|
||||
if (responses_a.is_cached)
|
||||
{
|
||||
data.awaiting_cache = true;
|
||||
}
|
||||
if (--shared_data.count == 0 && std::all_of (all_node_data.begin (), all_node_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; }))
|
||||
{
|
||||
shared_data.done = true;
|
||||
shared_data.promise.set_value ();
|
||||
}
|
||||
});
|
||||
}
|
||||
std::this_thread::sleep_for (1ms);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_EQ (shared_data_a.count, 0);
|
||||
shared_data_a.shared_future.wait ();
|
||||
};
|
||||
|
||||
func (node_data_single, shared_data_single);
|
||||
func (node_data_random, shared_data_random);
|
||||
});
|
||||
}
|
||||
|
||||
system.deadline_set (20s);
|
||||
while (!shared_data_single.done || !shared_data_random.done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
for (auto & thread : threads)
|
||||
{
|
||||
thread.join ();
|
||||
}
|
||||
}
|
||||
|
||||
TEST (node_telemetry, blocking_single_and_random)
|
||||
{
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
// Request telemetry metrics
|
||||
std::atomic<bool> done{ false };
|
||||
std::function<void()> call_system_poll;
|
||||
std::promise<void> promise;
|
||||
call_system_poll = [&call_system_poll, &worker = node_client->worker, &done, &system, &promise]() {
|
||||
if (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
worker.push_task (call_system_poll);
|
||||
}
|
||||
else
|
||||
{
|
||||
promise.set_value ();
|
||||
}
|
||||
};
|
||||
|
||||
// Keep pushing system.polls in another thread (worker), because we will be blocking this thread and unable to do so.
|
||||
system.deadline_set (10s);
|
||||
node_client->worker.push_task (call_system_poll);
|
||||
|
||||
// Blocking version of get_random_metrics_async
|
||||
auto telemetry_data_responses = node_client->telemetry.get_metrics_random_peers ();
|
||||
ASSERT_FALSE (telemetry_data_responses.is_cached);
|
||||
ASSERT_TRUE (telemetry_data_responses.all_received);
|
||||
compare_default_test_result_data (telemetry_data_responses.data.front (), *node_server);
|
||||
|
||||
// Now try single request metric
|
||||
auto telemetry_data_response = node_client->telemetry.get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ()));
|
||||
ASSERT_FALSE (telemetry_data_response.is_cached);
|
||||
ASSERT_FALSE (telemetry_data_response.error);
|
||||
compare_default_test_result_data (telemetry_data_response.data, *node_server);
|
||||
|
||||
done = true;
|
||||
promise.get_future ().wait ();
|
||||
}
|
||||
|
||||
namespace nano
|
||||
{
|
||||
TEST (node_telemetry, multiple_single_request_clearing)
|
||||
{
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
|
||||
nano::node_config node_config (nano::get_available_port (), system.logging);
|
||||
node_config.bandwidth_limit = 100000;
|
||||
auto node_server1 = system.add_node (node_config);
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
// Request telemetry metrics
|
||||
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
|
||||
|
||||
std::atomic<bool> done{ false };
|
||||
node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) {
|
||||
ASSERT_FALSE (response_a.error);
|
||||
ASSERT_FALSE (response_a.is_cached);
|
||||
done = true;
|
||||
});
|
||||
|
||||
ASSERT_EQ (1, node_client->telemetry.single_requests.size ());
|
||||
auto last_updated = node_client->telemetry.single_requests.begin ()->second.last_updated;
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
done = false;
|
||||
// Make another request to keep
|
||||
system.deadline_set (10s);
|
||||
node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) {
|
||||
ASSERT_FALSE (response_a.error);
|
||||
ASSERT_TRUE (response_a.is_cached);
|
||||
done = true;
|
||||
});
|
||||
|
||||
ASSERT_LT (last_updated, node_client->telemetry.single_requests.begin ()->second.last_updated);
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
done = false;
|
||||
auto channel1 = node_client->network.find_channel (node_server1->network.endpoint ());
|
||||
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) {
|
||||
ASSERT_FALSE (response_a.error);
|
||||
ASSERT_FALSE (response_a.is_cached);
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
done = false;
|
||||
node_client->telemetry.get_metrics_single_peer_async (channel1, [&done](nano::telemetry_data_response const & response_a) {
|
||||
ASSERT_FALSE (response_a.error);
|
||||
ASSERT_TRUE (response_a.is_cached);
|
||||
done = true;
|
||||
});
|
||||
|
||||
// single_requests should be removed as no more calls are being back
|
||||
system.deadline_set (10s);
|
||||
nano::unique_lock<std::mutex> lk (node_client->telemetry.mutex);
|
||||
while (!node_client->telemetry.single_requests.empty () || !done)
|
||||
{
|
||||
lk.unlock ();
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
lk.lock ();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST (node_telemetry, disconnects)
|
||||
{
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
// Try and request metrics from a node which is turned off but a channel is not closed yet
|
||||
auto channel = node_client->network.find_channel (node_server->network.endpoint ());
|
||||
node_server->stop ();
|
||||
ASSERT_TRUE (channel);
|
||||
|
||||
std::atomic<bool> done{ false };
|
||||
node_client->telemetry.get_metrics_random_peers_async ([&done](nano::telemetry_data_responses const & responses_a) {
|
||||
ASSERT_FALSE (responses_a.all_received);
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
|
||||
done = false;
|
||||
node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) {
|
||||
ASSERT_TRUE (response_a.error);
|
||||
done = true;
|
||||
});
|
||||
|
||||
system.deadline_set (10s);
|
||||
while (!done)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
void wait_peer_connections (nano::system & system_a)
|
||||
{
|
||||
system_a.deadline_set (10s);
|
||||
auto peer_count = 0;
|
||||
auto num_nodes = system_a.nodes.size ();
|
||||
while (peer_count != num_nodes * (num_nodes - 1))
|
||||
{
|
||||
ASSERT_NO_ERROR (system_a.poll ());
|
||||
peer_count = std::accumulate (system_a.nodes.cbegin (), system_a.nodes.cend (), 0, [](auto total, auto const & node) {
|
||||
auto transaction = node->store.tx_begin_read ();
|
||||
return total += node->store.peer_count (transaction);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void compare_default_test_result_data (nano::telemetry_data & telemetry_data_a, nano::node const & node_server_a)
|
||||
{
|
||||
ASSERT_EQ (telemetry_data_a.block_count, 1);
|
||||
ASSERT_EQ (telemetry_data_a.cemented_count, 1);
|
||||
ASSERT_EQ (telemetry_data_a.bandwidth_cap, node_server_a.config.bandwidth_limit);
|
||||
ASSERT_EQ (telemetry_data_a.peer_count, 1);
|
||||
ASSERT_EQ (telemetry_data_a.protocol_version_number, node_server_a.network_params.protocol.telemetry_protocol_version_min);
|
||||
ASSERT_EQ (telemetry_data_a.unchecked_count, 0);
|
||||
ASSERT_EQ (telemetry_data_a.account_count, 1);
|
||||
ASSERT_EQ (telemetry_data_a.vendor_version, nano::get_major_node_version ());
|
||||
ASSERT_LT (telemetry_data_a.uptime, 100);
|
||||
ASSERT_EQ (telemetry_data_a.genesis_block, nano::genesis ().hash ());
|
||||
}
|
||||
}
|
|
@ -199,6 +199,7 @@ TEST (toml, daemon_config_deserialize_defaults)
|
|||
ASSERT_EQ (conf.node.logging.network_publish_logging_value, defaults.node.logging.network_publish_logging_value);
|
||||
ASSERT_EQ (conf.node.logging.network_timeout_logging_value, defaults.node.logging.network_timeout_logging_value);
|
||||
ASSERT_EQ (conf.node.logging.node_lifetime_tracing_value, defaults.node.logging.node_lifetime_tracing_value);
|
||||
ASSERT_EQ (conf.node.logging.network_telemetry_logging_value, defaults.node.logging.network_telemetry_logging_value);
|
||||
ASSERT_EQ (conf.node.logging.rotation_size, defaults.node.logging.rotation_size);
|
||||
ASSERT_EQ (conf.node.logging.single_line_record_value, defaults.node.logging.single_line_record_value);
|
||||
ASSERT_EQ (conf.node.logging.timing_logging_value, defaults.node.logging.timing_logging_value);
|
||||
|
@ -453,6 +454,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
|
|||
network_keepalive = true
|
||||
network_message = true
|
||||
network_node_id_handshake = true
|
||||
network_telemetry_logging = true
|
||||
network_packet = true
|
||||
network_publish = true
|
||||
network_timeout = true
|
||||
|
@ -583,6 +585,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
|
|||
ASSERT_NE (conf.node.logging.network_keepalive_logging_value, defaults.node.logging.network_keepalive_logging_value);
|
||||
ASSERT_NE (conf.node.logging.network_message_logging_value, defaults.node.logging.network_message_logging_value);
|
||||
ASSERT_NE (conf.node.logging.network_node_id_handshake_logging_value, defaults.node.logging.network_node_id_handshake_logging_value);
|
||||
ASSERT_NE (conf.node.logging.network_telemetry_logging_value, defaults.node.logging.network_telemetry_logging_value);
|
||||
ASSERT_NE (conf.node.logging.network_packet_logging_value, defaults.node.logging.network_packet_logging_value);
|
||||
ASSERT_NE (conf.node.logging.network_publish_logging_value, defaults.node.logging.network_publish_logging_value);
|
||||
ASSERT_NE (conf.node.logging.network_timeout_logging_value, defaults.node.logging.network_timeout_logging_value);
|
||||
|
|
|
@ -78,6 +78,8 @@ if (NANO_STACKTRACE_BACKTRACE)
|
|||
endif ()
|
||||
|
||||
target_compile_definitions(nano_lib
|
||||
PRIVATE
|
||||
-DMAJOR_VERSION_STRING=${CPACK_PACKAGE_VERSION_MAJOR}
|
||||
PUBLIC
|
||||
-DACTIVE_NETWORK=${ACTIVE_NETWORK}
|
||||
)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#include <nano/lib/config.hpp>
|
||||
|
||||
#include <boost/filesystem/path.hpp>
|
||||
#include <boost/lexical_cast.hpp>
|
||||
|
||||
#include <valgrind/valgrind.h>
|
||||
|
||||
|
@ -8,6 +9,11 @@ namespace nano
|
|||
{
|
||||
const char * network_constants::active_network_err_msg = "Invalid network. Valid values are live, beta and test.";
|
||||
|
||||
uint8_t get_major_node_version ()
|
||||
{
|
||||
return boost::numeric_cast<uint8_t> (boost::lexical_cast<int> (NANO_MAJOR_VERSION_STRING));
|
||||
}
|
||||
|
||||
void force_nano_test_network ()
|
||||
{
|
||||
nano::network_constants::set_active_network (nano::nano_networks::nano_test_network);
|
||||
|
|
|
@ -20,6 +20,7 @@ namespace filesystem
|
|||
* Returns build version information
|
||||
*/
|
||||
const char * const NANO_VERSION_STRING = xstr (TAG_VERSION_STRING);
|
||||
const char * const NANO_MAJOR_VERSION_STRING = xstr (MAJOR_VERSION_STRING);
|
||||
|
||||
const char * const BUILD_INFO = xstr (GIT_COMMIT_HASH BOOST_COMPILER) " \"BOOST " xstr (BOOST_VERSION) "\" BUILT " xstr (__DATE__);
|
||||
|
||||
|
@ -36,6 +37,8 @@ const bool is_sanitizer_build = false;
|
|||
|
||||
namespace nano
|
||||
{
|
||||
uint8_t get_major_node_version ();
|
||||
|
||||
/**
|
||||
* Network variants with different genesis blocks and network parameters
|
||||
* @warning Enum values are used in integral comparisons; do not change.
|
||||
|
|
|
@ -200,6 +200,10 @@ std::string nano::error_rpc_messages::message (int ev) const
|
|||
return "Account has non-zero balance";
|
||||
case nano::error_rpc::payment_unable_create_account:
|
||||
return "Unable to create transaction account";
|
||||
case nano::error_rpc::peer_not_found:
|
||||
return "Peer not found";
|
||||
case nano::error_rpc::requires_port_and_address:
|
||||
return "Both port and address required";
|
||||
case nano::error_rpc::rpc_control_disabled:
|
||||
return "RPC control is disabled";
|
||||
case nano::error_rpc::sign_hash_disabled:
|
||||
|
|
|
@ -112,6 +112,8 @@ enum class error_rpc
|
|||
invalid_timestamp,
|
||||
payment_account_balance,
|
||||
payment_unable_create_account,
|
||||
peer_not_found,
|
||||
requires_port_and_address,
|
||||
rpc_control_disabled,
|
||||
sign_hash_disabled,
|
||||
source_not_found
|
||||
|
|
|
@ -556,6 +556,12 @@ std::string nano::stat::detail_to_string (uint32_t key)
|
|||
case nano::stat::detail::send:
|
||||
res = "send";
|
||||
break;
|
||||
case nano::stat::detail::telemetry_req:
|
||||
res = "telemetry_req";
|
||||
break;
|
||||
case nano::stat::detail::telemetry_ack:
|
||||
res = "telemetry_ack";
|
||||
break;
|
||||
case nano::stat::detail::state_block:
|
||||
res = "state_block";
|
||||
break;
|
||||
|
@ -634,6 +640,12 @@ std::string nano::stat::detail_to_string (uint32_t key)
|
|||
case nano::stat::detail::invalid_node_id_handshake_message:
|
||||
res = "invalid_node_id_handshake_message";
|
||||
break;
|
||||
case nano::stat::detail::invalid_telemetry_req_message:
|
||||
res = "invalid_telemetry_req_message";
|
||||
break;
|
||||
case nano::stat::detail::invalid_telemetry_ack_message:
|
||||
res = "invalid_telemetry_ack_message";
|
||||
break;
|
||||
case nano::stat::detail::outdated_version:
|
||||
res = "outdated_version";
|
||||
break;
|
||||
|
|
|
@ -233,6 +233,8 @@ public:
|
|||
confirm_req,
|
||||
confirm_ack,
|
||||
node_id_handshake,
|
||||
telemetry_req,
|
||||
telemetry_ack,
|
||||
|
||||
// bootstrap, callback
|
||||
initiate,
|
||||
|
@ -278,6 +280,8 @@ public:
|
|||
invalid_confirm_req_message,
|
||||
invalid_confirm_ack_message,
|
||||
invalid_node_id_handshake_message,
|
||||
invalid_telemetry_req_message,
|
||||
invalid_telemetry_ack_message,
|
||||
outdated_version,
|
||||
|
||||
// tcp
|
||||
|
|
|
@ -76,6 +76,8 @@ add_library (node
|
|||
logging.cpp
|
||||
network.hpp
|
||||
network.cpp
|
||||
telemetry.hpp
|
||||
telemetry.cpp
|
||||
nodeconfig.hpp
|
||||
nodeconfig.cpp
|
||||
node_observers.hpp
|
||||
|
|
|
@ -237,6 +237,23 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co
|
|||
});
|
||||
break;
|
||||
}
|
||||
case nano::message_type::telemetry_req:
|
||||
{
|
||||
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::in);
|
||||
if (is_realtime_connection ())
|
||||
{
|
||||
add_request (std::make_unique<nano::telemetry_req> (header));
|
||||
}
|
||||
receive ();
|
||||
break;
|
||||
}
|
||||
case nano::message_type::telemetry_ack:
|
||||
{
|
||||
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
|
||||
this_l->receive_telemetry_ack_action (ec, size_a, header);
|
||||
});
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
if (node->config.logging.network_logging ())
|
||||
|
@ -356,6 +373,31 @@ void nano::bootstrap_server::receive_keepalive_action (boost::system::error_code
|
|||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_server::receive_telemetry_ack_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
|
||||
{
|
||||
if (!ec)
|
||||
{
|
||||
auto error (false);
|
||||
nano::bufferstream stream (receive_buffer->data (), size_a);
|
||||
auto request (std::make_unique<nano::telemetry_ack> (error, stream, header_a));
|
||||
if (!error)
|
||||
{
|
||||
if (is_realtime_connection ())
|
||||
{
|
||||
add_request (std::unique_ptr<nano::message> (request.release ()));
|
||||
}
|
||||
receive ();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (node->config.logging.network_telemetry_logging ())
|
||||
{
|
||||
node->logger.try_log (boost::str (boost::format ("Error receiving telemetry ack: %1%") % ec.message ()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::bootstrap_server::receive_publish_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
|
||||
{
|
||||
if (!ec)
|
||||
|
@ -523,7 +565,6 @@ public:
|
|||
connection (connection_a)
|
||||
{
|
||||
}
|
||||
virtual ~request_response_visitor () = default;
|
||||
void keepalive (nano::keepalive const & message_a) override
|
||||
{
|
||||
connection->finish_request_async ();
|
||||
|
@ -576,6 +617,22 @@ public:
|
|||
auto response (std::make_shared<nano::frontier_req_server> (connection, std::unique_ptr<nano::frontier_req> (static_cast<nano::frontier_req *> (connection->requests.front ().release ()))));
|
||||
response->send_next ();
|
||||
}
|
||||
void telemetry_req (nano::telemetry_req const & message_a) override
|
||||
{
|
||||
connection->finish_request_async ();
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a]() {
|
||||
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
|
||||
});
|
||||
}
|
||||
void telemetry_ack (nano::telemetry_ack const & message_a) override
|
||||
{
|
||||
connection->finish_request_async ();
|
||||
auto connection_l (connection->shared_from_this ());
|
||||
connection->node->background ([connection_l, message_a]() {
|
||||
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
|
||||
});
|
||||
}
|
||||
void node_id_handshake (nano::node_id_handshake const & message_a) override
|
||||
{
|
||||
if (connection->node->config.logging.network_node_id_handshake_logging ())
|
||||
|
|
|
@ -57,6 +57,7 @@ public:
|
|||
void receive_confirm_req_action (boost::system::error_code const &, size_t, nano::message_header const &);
|
||||
void receive_confirm_ack_action (boost::system::error_code const &, size_t, nano::message_header const &);
|
||||
void receive_node_id_handshake_action (boost::system::error_code const &, size_t, nano::message_header const &);
|
||||
void receive_telemetry_ack_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a);
|
||||
void add_request (std::unique_ptr<nano::message>);
|
||||
void finish_request ();
|
||||
void finish_request_async ();
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
std::bitset<16> constexpr nano::message_header::block_type_mask;
|
||||
std::bitset<16> constexpr nano::message_header::count_mask;
|
||||
|
||||
namespace
|
||||
{
|
||||
nano::protocol_constants const & get_protocol_constants ()
|
||||
|
@ -197,8 +198,9 @@ size_t nano::message_header::payload_length_bytes () const
|
|||
return nano::bulk_pull::size + (bulk_pull_is_count_present () ? nano::bulk_pull::extended_parameters_size : 0);
|
||||
}
|
||||
case nano::message_type::bulk_push:
|
||||
case nano::message_type::telemetry_req:
|
||||
{
|
||||
// bulk_push doesn't have a payload
|
||||
// These don't have a payload
|
||||
return 0;
|
||||
}
|
||||
case nano::message_type::frontier_req:
|
||||
|
@ -229,6 +231,10 @@ size_t nano::message_header::payload_length_bytes () const
|
|||
{
|
||||
return nano::node_id_handshake::size (*this);
|
||||
}
|
||||
case nano::message_type::telemetry_ack:
|
||||
{
|
||||
return nano::telemetry_ack::size (*this);
|
||||
}
|
||||
default:
|
||||
{
|
||||
assert (false);
|
||||
|
@ -280,6 +286,14 @@ std::string nano::message_parser::status_string ()
|
|||
{
|
||||
return "invalid_node_id_handshake_message";
|
||||
}
|
||||
case nano::message_parser::parse_status::invalid_telemetry_req_message:
|
||||
{
|
||||
return "invalid_telemetry_req_message";
|
||||
}
|
||||
case nano::message_parser::parse_status::invalid_telemetry_ack_message:
|
||||
{
|
||||
return "invalid_telemetry_ack_message";
|
||||
}
|
||||
case nano::message_parser::parse_status::outdated_version:
|
||||
{
|
||||
return "outdated_version";
|
||||
|
@ -353,6 +367,16 @@ void nano::message_parser::deserialize_buffer (uint8_t const * buffer_a, size_t
|
|||
deserialize_node_id_handshake (stream, header);
|
||||
break;
|
||||
}
|
||||
case nano::message_type::telemetry_req:
|
||||
{
|
||||
deserialize_telemetry_req (stream, header);
|
||||
break;
|
||||
}
|
||||
case nano::message_type::telemetry_ack:
|
||||
{
|
||||
deserialize_telemetry_ack (stream, header);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
status = parse_status::invalid_message_type;
|
||||
|
@ -467,6 +491,34 @@ void nano::message_parser::deserialize_node_id_handshake (nano::stream & stream_
|
|||
}
|
||||
}
|
||||
|
||||
void nano::message_parser::deserialize_telemetry_req (nano::stream & stream_a, nano::message_header const & header_a)
|
||||
{
|
||||
nano::telemetry_req incoming (header_a);
|
||||
if (at_end (stream_a))
|
||||
{
|
||||
visitor.telemetry_req (incoming);
|
||||
}
|
||||
else
|
||||
{
|
||||
status = parse_status::invalid_telemetry_req_message;
|
||||
}
|
||||
}
|
||||
|
||||
void nano::message_parser::deserialize_telemetry_ack (nano::stream & stream_a, nano::message_header const & header_a)
|
||||
{
|
||||
bool error_l (false);
|
||||
nano::telemetry_ack incoming (error_l, stream_a, header_a);
|
||||
// Intentionally not checking if at the end of stream, because these messages support backwards/forwards compatibility
|
||||
if (!error_l)
|
||||
{
|
||||
visitor.telemetry_ack (incoming);
|
||||
}
|
||||
else
|
||||
{
|
||||
status = parse_status::invalid_telemetry_ack_message;
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::message_parser::at_end (nano::stream & stream_a)
|
||||
{
|
||||
uint8_t junk;
|
||||
|
@ -993,6 +1045,234 @@ void nano::bulk_push::visit (nano::message_visitor & visitor_a) const
|
|||
visitor_a.bulk_push (*this);
|
||||
}
|
||||
|
||||
nano::telemetry_req::telemetry_req () :
|
||||
message (nano::message_type::telemetry_req)
|
||||
{
|
||||
}
|
||||
|
||||
nano::telemetry_req::telemetry_req (nano::message_header const & header_a) :
|
||||
message (header_a)
|
||||
{
|
||||
}
|
||||
|
||||
bool nano::telemetry_req::deserialize (nano::stream & stream_a)
|
||||
{
|
||||
assert (header.type == nano::message_type::telemetry_req);
|
||||
return false;
|
||||
}
|
||||
|
||||
void nano::telemetry_req::serialize (nano::stream & stream_a) const
|
||||
{
|
||||
header.serialize (stream_a);
|
||||
}
|
||||
|
||||
void nano::telemetry_req::visit (nano::message_visitor & visitor_a) const
|
||||
{
|
||||
visitor_a.telemetry_req (*this);
|
||||
}
|
||||
|
||||
nano::telemetry_ack::telemetry_ack (bool & error_a, nano::stream & stream_a, nano::message_header const & message_header) :
|
||||
message (message_header)
|
||||
{
|
||||
if (!error_a)
|
||||
{
|
||||
error_a = deserialize (stream_a);
|
||||
}
|
||||
}
|
||||
|
||||
nano::telemetry_ack::telemetry_ack (nano::telemetry_data const & telemetry_data_a) :
|
||||
message (nano::message_type::telemetry_ack),
|
||||
data (telemetry_data_a)
|
||||
{
|
||||
header.extensions = telemetry_data::size;
|
||||
}
|
||||
|
||||
void nano::telemetry_ack::serialize (nano::stream & stream_a) const
|
||||
{
|
||||
header.serialize (stream_a);
|
||||
write (stream_a, data.block_count);
|
||||
write (stream_a, data.cemented_count);
|
||||
write (stream_a, data.unchecked_count);
|
||||
write (stream_a, data.account_count);
|
||||
write (stream_a, data.bandwidth_cap);
|
||||
write (stream_a, data.peer_count);
|
||||
write (stream_a, data.protocol_version_number);
|
||||
write (stream_a, data.vendor_version);
|
||||
write (stream_a, data.uptime);
|
||||
write (stream_a, data.genesis_block.bytes);
|
||||
}
|
||||
|
||||
bool nano::telemetry_ack::deserialize (nano::stream & stream_a)
|
||||
{
|
||||
auto error (false);
|
||||
try
|
||||
{
|
||||
read (stream_a, data.block_count);
|
||||
read (stream_a, data.cemented_count);
|
||||
read (stream_a, data.unchecked_count);
|
||||
read (stream_a, data.account_count);
|
||||
read (stream_a, data.bandwidth_cap);
|
||||
read (stream_a, data.peer_count);
|
||||
read (stream_a, data.protocol_version_number);
|
||||
read (stream_a, data.vendor_version);
|
||||
read (stream_a, data.uptime);
|
||||
read (stream_a, data.genesis_block.bytes);
|
||||
}
|
||||
catch (std::runtime_error const &)
|
||||
{
|
||||
error = true;
|
||||
}
|
||||
|
||||
return error;
|
||||
}
|
||||
|
||||
void nano::telemetry_ack::visit (nano::message_visitor & visitor_a) const
|
||||
{
|
||||
visitor_a.telemetry_ack (*this);
|
||||
}
|
||||
|
||||
uint16_t nano::telemetry_ack::size (nano::message_header const & message_header_a)
|
||||
{
|
||||
return static_cast<uint16_t> (message_header_a.extensions.to_ulong ());
|
||||
}
|
||||
|
||||
nano::telemetry_data nano::telemetry_data::consolidate (std::vector<nano::telemetry_data> const & telemetry_data_responses_a)
|
||||
{
|
||||
if (telemetry_data_responses_a.empty ())
|
||||
{
|
||||
return {};
|
||||
}
|
||||
else if (telemetry_data_responses_a.size () == 1)
|
||||
{
|
||||
// Only 1 element in the collection, so just return it.
|
||||
return telemetry_data_responses_a.front ();
|
||||
}
|
||||
|
||||
nano::uint128_t account_sum{ 0 };
|
||||
nano::uint128_t block_sum{ 0 };
|
||||
nano::uint128_t cemented_sum{ 0 };
|
||||
nano::uint128_t peer_sum{ 0 };
|
||||
nano::uint128_t unchecked_sum{ 0 };
|
||||
nano::uint128_t uptime_sum{ 0 };
|
||||
nano::uint128_t bandwidth_sum{ 0 };
|
||||
|
||||
std::unordered_map<uint8_t, int> protocol_versions;
|
||||
std::unordered_map<uint8_t, int> vendor_versions;
|
||||
std::unordered_map<uint64_t, int> bandwidth_caps;
|
||||
std::unordered_map<nano::block_hash, int> genesis_blocks;
|
||||
|
||||
nano::uint128_t account_average{ 0 };
|
||||
|
||||
for (auto const & telemetry_data : telemetry_data_responses_a)
|
||||
{
|
||||
account_sum += telemetry_data.account_count;
|
||||
block_sum += telemetry_data.block_count;
|
||||
cemented_sum += telemetry_data.cemented_count;
|
||||
++vendor_versions[telemetry_data.vendor_version];
|
||||
++protocol_versions[telemetry_data.protocol_version_number];
|
||||
peer_sum += telemetry_data.peer_count;
|
||||
|
||||
// 0 has a special meaning (unlimited), don't include it in the average as it will be heavily skewed
|
||||
if (telemetry_data.bandwidth_cap != 0)
|
||||
{
|
||||
bandwidth_sum += telemetry_data.bandwidth_cap;
|
||||
}
|
||||
++bandwidth_caps[telemetry_data.bandwidth_cap];
|
||||
unchecked_sum += telemetry_data.unchecked_count;
|
||||
uptime_sum += telemetry_data.uptime;
|
||||
++genesis_blocks[telemetry_data.genesis_block];
|
||||
}
|
||||
|
||||
nano::telemetry_data consolidated_data;
|
||||
auto size = telemetry_data_responses_a.size ();
|
||||
consolidated_data.account_count = boost::numeric_cast<decltype (account_count)> (account_sum / size);
|
||||
consolidated_data.block_count = boost::numeric_cast<decltype (block_count)> (block_sum / size);
|
||||
consolidated_data.cemented_count = boost::numeric_cast<decltype (cemented_count)> (cemented_sum / size);
|
||||
consolidated_data.peer_count = boost::numeric_cast<decltype (peer_count)> (peer_sum / size);
|
||||
consolidated_data.uptime = boost::numeric_cast<decltype (uptime)> (uptime_sum / size);
|
||||
consolidated_data.unchecked_count = boost::numeric_cast<decltype (unchecked_count)> (unchecked_sum / size);
|
||||
|
||||
auto set_mode_or_average = [](auto const & collection, auto & var, auto const & sum, size_t size) {
|
||||
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
|
||||
return lhs.second < rhs.second;
|
||||
});
|
||||
if (max->second > 1)
|
||||
{
|
||||
var = max->first;
|
||||
}
|
||||
else
|
||||
{
|
||||
var = (sum / size).template convert_to<std::remove_reference_t<decltype (var)>> ();
|
||||
}
|
||||
};
|
||||
|
||||
auto set_mode = [](auto const & collection, auto & var, size_t size) {
|
||||
auto max = std::max_element (collection.begin (), collection.end (), [](auto const & lhs, auto const & rhs) {
|
||||
return lhs.second < rhs.second;
|
||||
});
|
||||
if (max->second > 1)
|
||||
{
|
||||
var = max->first;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Just pick the first one
|
||||
var = collection.begin ()->first;
|
||||
}
|
||||
};
|
||||
|
||||
// Use the mode of protocol version, vendor version and bandwidth cap if there is 2 or more using it
|
||||
set_mode_or_average (bandwidth_caps, consolidated_data.bandwidth_cap, bandwidth_sum, size);
|
||||
set_mode (protocol_versions, consolidated_data.protocol_version_number, size);
|
||||
set_mode (vendor_versions, consolidated_data.vendor_version, size);
|
||||
set_mode (genesis_blocks, consolidated_data.genesis_block, size);
|
||||
|
||||
return consolidated_data;
|
||||
}
|
||||
|
||||
nano::error nano::telemetry_data::serialize_json (nano::jsonconfig & json) const
|
||||
{
|
||||
json.put ("block_count", block_count);
|
||||
json.put ("cemented_count", cemented_count);
|
||||
json.put ("unchecked_count", unchecked_count);
|
||||
json.put ("account_count", account_count);
|
||||
json.put ("bandwidth_cap", bandwidth_cap);
|
||||
json.put ("peer_count", peer_count);
|
||||
json.put ("protocol_version_number", protocol_version_number);
|
||||
json.put ("vendor_version", vendor_version);
|
||||
json.put ("uptime", uptime);
|
||||
json.put ("genesis_block", genesis_block.to_string ());
|
||||
return json.get_error ();
|
||||
}
|
||||
|
||||
nano::error nano::telemetry_data::deserialize_json (nano::jsonconfig & json)
|
||||
{
|
||||
json.get ("block_count", block_count);
|
||||
json.get ("cemented_count", cemented_count);
|
||||
json.get ("unchecked_count", unchecked_count);
|
||||
json.get ("account_count", account_count);
|
||||
json.get ("bandwidth_cap", bandwidth_cap);
|
||||
json.get ("peer_count", peer_count);
|
||||
json.get ("protocol_version_number", protocol_version_number);
|
||||
json.get ("vendor_version", vendor_version);
|
||||
json.get ("uptime", uptime);
|
||||
std::string genesis_block_l;
|
||||
json.get ("genesis_block", genesis_block_l);
|
||||
if (!json.get_error ())
|
||||
{
|
||||
if (genesis_block.decode_hex (genesis_block_l))
|
||||
{
|
||||
json.get_error ().set ("Could not deserialize genesis block");
|
||||
}
|
||||
}
|
||||
return json.get_error ();
|
||||
}
|
||||
|
||||
bool nano::telemetry_data::operator== (nano::telemetry_data const & data_a) const
|
||||
{
|
||||
return (block_count == data_a.block_count && cemented_count == data_a.cemented_count && unchecked_count == data_a.unchecked_count && account_count == data_a.account_count && bandwidth_cap == data_a.bandwidth_cap && uptime == data_a.uptime && peer_count == data_a.peer_count && protocol_version_number == data_a.protocol_version_number && vendor_version == data_a.vendor_version && genesis_block == data_a.genesis_block);
|
||||
}
|
||||
|
||||
nano::node_id_handshake::node_id_handshake (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a) :
|
||||
message (header_a),
|
||||
query (boost::none),
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
#include <nano/boost/asio/ip/udp.hpp>
|
||||
#include <nano/crypto_lib/random_pool.hpp>
|
||||
#include <nano/lib/asio.hpp>
|
||||
#include <nano/lib/jsonconfig.hpp>
|
||||
#include <nano/lib/memory.hpp>
|
||||
#include <nano/secure/common.hpp>
|
||||
|
||||
|
@ -169,8 +170,11 @@ enum class message_type : uint8_t
|
|||
frontier_req = 0x8,
|
||||
/* deleted 0x9 */
|
||||
node_id_handshake = 0x0a,
|
||||
bulk_pull_account = 0x0b
|
||||
bulk_pull_account = 0x0b,
|
||||
telemetry_req = 0x0c,
|
||||
telemetry_ack = 0x0d
|
||||
};
|
||||
|
||||
enum class bulk_pull_account_flags : uint8_t
|
||||
{
|
||||
pending_hash_and_amount = 0x0,
|
||||
|
@ -206,8 +210,8 @@ public:
|
|||
/** Size of the payload in bytes. For some messages, the payload size is based on header flags. */
|
||||
size_t payload_length_bytes () const;
|
||||
|
||||
static std::bitset<16> constexpr block_type_mask = std::bitset<16> (0x0f00);
|
||||
static std::bitset<16> constexpr count_mask = std::bitset<16> (0xf000);
|
||||
static std::bitset<16> constexpr block_type_mask{ 0x0f00 };
|
||||
static std::bitset<16> constexpr count_mask{ 0xf000 };
|
||||
};
|
||||
class message
|
||||
{
|
||||
|
@ -237,6 +241,8 @@ public:
|
|||
invalid_confirm_req_message,
|
||||
invalid_confirm_ack_message,
|
||||
invalid_node_id_handshake_message,
|
||||
invalid_telemetry_req_message,
|
||||
invalid_telemetry_ack_message,
|
||||
outdated_version,
|
||||
invalid_magic,
|
||||
invalid_network
|
||||
|
@ -248,6 +254,8 @@ public:
|
|||
void deserialize_confirm_req (nano::stream &, nano::message_header const &);
|
||||
void deserialize_confirm_ack (nano::stream &, nano::message_header const &);
|
||||
void deserialize_node_id_handshake (nano::stream &, nano::message_header const &);
|
||||
void deserialize_telemetry_req (nano::stream &, nano::message_header const &);
|
||||
void deserialize_telemetry_ack (nano::stream &, nano::message_header const &);
|
||||
bool at_end (nano::stream &);
|
||||
nano::block_uniquer & block_uniquer;
|
||||
nano::vote_uniquer & vote_uniquer;
|
||||
|
@ -321,6 +329,49 @@ public:
|
|||
uint32_t count;
|
||||
static size_t constexpr size = sizeof (start) + sizeof (age) + sizeof (count);
|
||||
};
|
||||
|
||||
class telemetry_data
|
||||
{
|
||||
public:
|
||||
uint64_t block_count{ 0 };
|
||||
uint64_t cemented_count{ 0 };
|
||||
uint64_t unchecked_count{ 0 };
|
||||
uint64_t account_count{ 0 };
|
||||
uint64_t bandwidth_cap{ 0 };
|
||||
uint64_t uptime{ 0 };
|
||||
uint32_t peer_count{ 0 };
|
||||
uint8_t protocol_version_number{ 0 };
|
||||
uint8_t vendor_version{ 0 };
|
||||
nano::block_hash genesis_block{ 0 };
|
||||
|
||||
static nano::telemetry_data consolidate (std::vector<nano::telemetry_data> const & telemetry_data_responses);
|
||||
nano::error serialize_json (nano::jsonconfig & json) const;
|
||||
nano::error deserialize_json (nano::jsonconfig & json);
|
||||
bool operator== (nano::telemetry_data const &) const;
|
||||
|
||||
static auto constexpr size = sizeof (block_count) + sizeof (cemented_count) + sizeof (unchecked_count) + sizeof (account_count) + sizeof (bandwidth_cap) + sizeof (peer_count) + sizeof (protocol_version_number) + sizeof (vendor_version) + sizeof (uptime) + sizeof (genesis_block);
|
||||
};
|
||||
class telemetry_req final : public message
|
||||
{
|
||||
public:
|
||||
telemetry_req ();
|
||||
explicit telemetry_req (nano::message_header const &);
|
||||
void serialize (nano::stream &) const override;
|
||||
bool deserialize (nano::stream &);
|
||||
void visit (nano::message_visitor &) const override;
|
||||
};
|
||||
class telemetry_ack final : public message
|
||||
{
|
||||
public:
|
||||
telemetry_ack (bool &, nano::stream &, nano::message_header const &);
|
||||
explicit telemetry_ack (telemetry_data const &);
|
||||
void serialize (nano::stream &) const override;
|
||||
void visit (nano::message_visitor &) const override;
|
||||
bool deserialize (nano::stream &);
|
||||
static uint16_t size (nano::message_header const &);
|
||||
nano::telemetry_data data;
|
||||
};
|
||||
|
||||
class bulk_pull final : public message
|
||||
{
|
||||
public:
|
||||
|
@ -387,6 +438,8 @@ public:
|
|||
virtual void bulk_push (nano::bulk_push const &) = 0;
|
||||
virtual void frontier_req (nano::frontier_req const &) = 0;
|
||||
virtual void node_id_handshake (nano::node_id_handshake const &) = 0;
|
||||
virtual void telemetry_req (nano::telemetry_req const &) = 0;
|
||||
virtual void telemetry_ack (nano::telemetry_ack const &) = 0;
|
||||
virtual ~message_visitor ();
|
||||
};
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
#include <nano/node/json_payment_observer.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/node_rpc_config.hpp>
|
||||
#include <nano/node/telemetry.hpp>
|
||||
|
||||
#include <boost/property_tree/json_parser.hpp>
|
||||
#include <boost/property_tree/ptree.hpp>
|
||||
|
@ -3894,6 +3895,130 @@ void nano::json_handler::stop ()
|
|||
}
|
||||
}
|
||||
|
||||
void nano::json_handler::telemetry ()
|
||||
{
|
||||
auto rpc_l (shared_from_this ());
|
||||
|
||||
auto address_text (request.get_optional<std::string> ("address"));
|
||||
auto port_text (request.get_optional<std::string> ("port"));
|
||||
|
||||
if (address_text.is_initialized () || port_text.is_initialized ())
|
||||
{
|
||||
// Check both are specified
|
||||
std::shared_ptr<nano::transport::channel> channel;
|
||||
if (address_text.is_initialized () && port_text.is_initialized ())
|
||||
{
|
||||
uint16_t port;
|
||||
if (!nano::parse_port (*port_text, port))
|
||||
{
|
||||
boost::system::error_code address_ec;
|
||||
auto address (boost::asio::ip::make_address_v6 (*address_text, address_ec));
|
||||
if (!address_ec)
|
||||
{
|
||||
nano::endpoint endpoint (address, port);
|
||||
channel = node.network.find_channel (endpoint);
|
||||
if (!channel)
|
||||
{
|
||||
ec = nano::error_rpc::peer_not_found;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ec = nano::error_common::invalid_ip_address;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ec = nano::error_common::invalid_port;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ec = nano::error_rpc::requires_port_and_address;
|
||||
}
|
||||
|
||||
if (!ec)
|
||||
{
|
||||
assert (channel);
|
||||
node.telemetry.get_metrics_single_peer_async (channel, [rpc_l](auto const & single_telemetry_metric_a) {
|
||||
if (!single_telemetry_metric_a.error)
|
||||
{
|
||||
nano::jsonconfig config_l;
|
||||
auto err = single_telemetry_metric_a.data.serialize_json (config_l);
|
||||
auto const & ptree = config_l.get_tree ();
|
||||
|
||||
if (!err)
|
||||
{
|
||||
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
|
||||
rpc_l->response_l.put ("cached", single_telemetry_metric_a.is_cached);
|
||||
}
|
||||
else
|
||||
{
|
||||
rpc_l->ec = nano::error_rpc::generic;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
rpc_l->ec = nano::error_rpc::generic;
|
||||
}
|
||||
|
||||
rpc_l->response_errors ();
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
response_errors ();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// By default, consolidated (average or mode) telemetry metrics are returned,
|
||||
// setting "raw" to true returns metrics from all nodes requested.
|
||||
auto raw = request.get_optional<bool> ("raw");
|
||||
auto output_raw = raw.value_or (false);
|
||||
node.telemetry.get_metrics_random_peers_async ([rpc_l, output_raw](auto const & batched_telemetry_metrics_a) {
|
||||
if (output_raw)
|
||||
{
|
||||
boost::property_tree::ptree metrics;
|
||||
for (auto & telemetry_metrics : batched_telemetry_metrics_a.data)
|
||||
{
|
||||
nano::jsonconfig config_l;
|
||||
auto err = telemetry_metrics.serialize_json (config_l);
|
||||
if (!err)
|
||||
{
|
||||
metrics.push_back (std::make_pair ("", config_l.get_tree ()));
|
||||
}
|
||||
else
|
||||
{
|
||||
rpc_l->ec = nano::error_rpc::generic;
|
||||
}
|
||||
}
|
||||
|
||||
rpc_l->response_l.put_child ("metrics", metrics);
|
||||
}
|
||||
else
|
||||
{
|
||||
nano::jsonconfig config_l;
|
||||
auto average_telemetry_metrics = nano::telemetry_data::consolidate (batched_telemetry_metrics_a.data);
|
||||
auto err = average_telemetry_metrics.serialize_json (config_l);
|
||||
auto const & ptree = config_l.get_tree ();
|
||||
|
||||
if (!err)
|
||||
{
|
||||
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
|
||||
}
|
||||
else
|
||||
{
|
||||
rpc_l->ec = nano::error_rpc::generic;
|
||||
}
|
||||
}
|
||||
|
||||
rpc_l->response_l.put ("cached", batched_telemetry_metrics_a.is_cached);
|
||||
rpc_l->response_errors ();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void nano::json_handler::unchecked ()
|
||||
{
|
||||
const bool json_block_l = request.get<bool> ("json_block", false);
|
||||
|
@ -5034,6 +5159,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map ()
|
|||
no_arg_funcs.emplace ("stats", &nano::json_handler::stats);
|
||||
no_arg_funcs.emplace ("stats_clear", &nano::json_handler::stats_clear);
|
||||
no_arg_funcs.emplace ("stop", &nano::json_handler::stop);
|
||||
no_arg_funcs.emplace ("node_telemetry", &nano::json_handler::telemetry);
|
||||
no_arg_funcs.emplace ("unchecked", &nano::json_handler::unchecked);
|
||||
no_arg_funcs.emplace ("unchecked_clear", &nano::json_handler::unchecked_clear);
|
||||
no_arg_funcs.emplace ("unchecked_get", &nano::json_handler::unchecked_get);
|
||||
|
|
|
@ -97,6 +97,7 @@ public:
|
|||
void stats ();
|
||||
void stats_clear ();
|
||||
void stop ();
|
||||
void telemetry ();
|
||||
void unchecked ();
|
||||
void unchecked_clear ();
|
||||
void unchecked_get ();
|
||||
|
|
|
@ -94,6 +94,7 @@ nano::error nano::logging::serialize_toml (nano::tomlconfig & toml) const
|
|||
toml.put ("network_packet", network_packet_logging_value, "Log network packet activity.\ntype:bool");
|
||||
toml.put ("network_keepalive", network_keepalive_logging_value, "Log keepalive related messages.\ntype:bool");
|
||||
toml.put ("network_node_id_handshake", network_node_id_handshake_logging_value, "Log node-id handshake related messages.\ntype:bool");
|
||||
toml.put ("network_telemetry", network_telemetry_logging_value, "Log telemetry related messages.\ntype:bool");
|
||||
toml.put ("node_lifetime_tracing", node_lifetime_tracing_value, "Log node startup and shutdown messages.\ntype:bool");
|
||||
toml.put ("insufficient_work", insufficient_work_logging_value, "Log if insufficient work is detected.\ntype:bool");
|
||||
toml.put ("log_ipc", log_ipc_value, "Log IPC related activity.\ntype:bool");
|
||||
|
@ -124,6 +125,7 @@ nano::error nano::logging::deserialize_toml (nano::tomlconfig & toml)
|
|||
toml.get<bool> ("network_packet", network_packet_logging_value);
|
||||
toml.get<bool> ("network_keepalive", network_keepalive_logging_value);
|
||||
toml.get<bool> ("network_node_id_handshake", network_node_id_handshake_logging_value);
|
||||
toml.get<bool> ("network_telemetry_logging", network_telemetry_logging_value);
|
||||
toml.get<bool> ("node_lifetime_tracing", node_lifetime_tracing_value);
|
||||
toml.get<bool> ("insufficient_work", insufficient_work_logging_value);
|
||||
toml.get<bool> ("log_ipc", log_ipc_value);
|
||||
|
@ -157,6 +159,7 @@ nano::error nano::logging::serialize_json (nano::jsonconfig & json) const
|
|||
json.put ("network_packet", network_packet_logging_value);
|
||||
json.put ("network_keepalive", network_keepalive_logging_value);
|
||||
json.put ("network_node_id_handshake", network_node_id_handshake_logging_value);
|
||||
json.put ("network_telemetry_logging", network_telemetry_logging_value);
|
||||
json.put ("node_lifetime_tracing", node_lifetime_tracing_value);
|
||||
json.put ("insufficient_work", insufficient_work_logging_value);
|
||||
json.put ("log_ipc", log_ipc_value);
|
||||
|
@ -309,6 +312,11 @@ bool nano::logging::network_node_id_handshake_logging () const
|
|||
return network_logging () && network_node_id_handshake_logging_value;
|
||||
}
|
||||
|
||||
bool nano::logging::network_telemetry_logging () const
|
||||
{
|
||||
return network_logging () && network_telemetry_logging_value;
|
||||
}
|
||||
|
||||
bool nano::logging::node_lifetime_tracing () const
|
||||
{
|
||||
return node_lifetime_tracing_value;
|
||||
|
|
|
@ -52,6 +52,7 @@ public:
|
|||
bool network_packet_logging () const;
|
||||
bool network_keepalive_logging () const;
|
||||
bool network_node_id_handshake_logging () const;
|
||||
bool network_telemetry_logging () const;
|
||||
bool node_lifetime_tracing () const;
|
||||
bool insufficient_work_logging () const;
|
||||
bool upnp_details_logging () const;
|
||||
|
@ -75,6 +76,7 @@ public:
|
|||
bool network_packet_logging_value{ false };
|
||||
bool network_keepalive_logging_value{ false };
|
||||
bool network_node_id_handshake_logging_value{ false };
|
||||
bool network_telemetry_logging_value{ false };
|
||||
bool node_lifetime_tracing_value{ false };
|
||||
bool insufficient_work_logging_value{ true };
|
||||
bool log_ipc_value{ true };
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
#include <nano/lib/threading.hpp>
|
||||
#include <nano/node/network.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/telemetry.hpp>
|
||||
#include <nano/secure/buffer.hpp>
|
||||
|
||||
#include <boost/format.hpp>
|
||||
|
@ -506,6 +507,42 @@ public:
|
|||
{
|
||||
node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in);
|
||||
}
|
||||
void telemetry_req (nano::telemetry_req const & message_a) override
|
||||
{
|
||||
if (node.config.logging.network_telemetry_logging ())
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Telemetry_req message from %1%") % channel->to_string ()));
|
||||
}
|
||||
node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in);
|
||||
|
||||
nano::telemetry_data telemetry_data;
|
||||
telemetry_data.block_count = node.ledger.cache.block_count;
|
||||
telemetry_data.cemented_count = node.ledger.cache.cemented_count;
|
||||
telemetry_data.bandwidth_cap = node.config.bandwidth_limit;
|
||||
telemetry_data.protocol_version_number = node.network_params.protocol.protocol_version;
|
||||
telemetry_data.vendor_version = nano::get_major_node_version ();
|
||||
telemetry_data.uptime = std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - node.startup_time).count ();
|
||||
telemetry_data.unchecked_count = node.ledger.cache.unchecked_count;
|
||||
telemetry_data.genesis_block = nano::genesis ().hash ();
|
||||
telemetry_data.peer_count = node.network.size ();
|
||||
|
||||
{
|
||||
auto transaction = node.store.tx_begin_read ();
|
||||
telemetry_data.account_count = node.store.account_count (transaction);
|
||||
}
|
||||
|
||||
nano::telemetry_ack telemetry_ack (telemetry_data);
|
||||
channel->send (telemetry_ack);
|
||||
}
|
||||
void telemetry_ack (nano::telemetry_ack const & message_a) override
|
||||
{
|
||||
if (node.config.logging.network_telemetry_logging ())
|
||||
{
|
||||
node.logger.try_log (boost::str (boost::format ("Received telemetry_ack message from %1%") % channel->to_string ()));
|
||||
}
|
||||
node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in);
|
||||
node.telemetry.add (message_a.data, channel->get_endpoint ());
|
||||
}
|
||||
nano::node & node;
|
||||
std::shared_ptr<nano::transport::channel> channel;
|
||||
};
|
||||
|
@ -590,10 +627,10 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_fanout
|
|||
return result;
|
||||
}
|
||||
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (size_t count_a) const
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (size_t count_a, uint8_t min_version_a) const
|
||||
{
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> result (tcp_channels.random_set (count_a));
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> udp_random (udp_channels.random_set (count_a));
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> result (tcp_channels.random_set (count_a, min_version_a));
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> udp_random (udp_channels.random_set (count_a, min_version_a));
|
||||
for (auto i (udp_random.begin ()), n (udp_random.end ()); i != n && result.size () < count_a * 1.5; ++i)
|
||||
{
|
||||
result.insert (*i);
|
||||
|
|
|
@ -139,7 +139,7 @@ public:
|
|||
// A list of random peers sized for the configured rebroadcast fanout
|
||||
std::deque<std::shared_ptr<nano::transport::channel>> list_fanout ();
|
||||
void random_fill (std::array<nano::endpoint, 8> &) const;
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t) const;
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0) const;
|
||||
// Get the next peer for attempting a tcp bootstrap connection
|
||||
nano::tcp_endpoint bootstrap_peer (bool = false);
|
||||
nano::endpoint endpoint ();
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
#include <nano/lib/utility.hpp>
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/telemetry.hpp>
|
||||
#include <nano/node/websocket.hpp>
|
||||
#include <nano/rpc/rpc.hpp>
|
||||
#include <nano/secure/buffer.hpp>
|
||||
|
@ -125,6 +126,7 @@ gap_cache (*this),
|
|||
ledger (store, stats, flags_a.generate_cache),
|
||||
checker (config.signature_checker_threads),
|
||||
network (*this, config.peering_port),
|
||||
telemetry (network, alarm, worker),
|
||||
bootstrap_initiator (*this),
|
||||
bootstrap (config.peering_port, *this),
|
||||
application_path (application_path_a),
|
||||
|
@ -133,10 +135,12 @@ vote_processor (checker, active, store, observers, stats, config, logger, online
|
|||
rep_crawler (*this),
|
||||
warmed_up (0),
|
||||
block_processor (*this, write_database_queue),
|
||||
// clang-format off
|
||||
block_processor_thread ([this]() {
|
||||
nano::thread_role::set (nano::thread_role::name::block_processing);
|
||||
this->block_processor.process_blocks ();
|
||||
}),
|
||||
// clang-format on
|
||||
online_reps (ledger, network_params, config.online_weight_minimum.number ()),
|
||||
votes_cache (wallets),
|
||||
vote_uniquer (block_uniquer),
|
||||
|
@ -579,6 +583,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
|
|||
composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator"));
|
||||
composite->add_component (collect_container_info (node.bootstrap, "bootstrap"));
|
||||
composite->add_component (collect_container_info (node.network, "network"));
|
||||
composite->add_component (collect_container_info (node.telemetry, "telemetry"));
|
||||
composite->add_component (collect_container_info (node.observers, "observers"));
|
||||
composite->add_component (collect_container_info (node.wallets, "wallets"));
|
||||
composite->add_component (collect_container_info (node.vote_processor, "vote_processor"));
|
||||
|
@ -691,6 +696,7 @@ void nano::node::stop ()
|
|||
confirmation_height_processor.stop ();
|
||||
active.stop ();
|
||||
network.stop ();
|
||||
telemetry.stop ();
|
||||
if (websocket_server)
|
||||
{
|
||||
websocket_server->stop ();
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include <nano/node/repcrawler.hpp>
|
||||
#include <nano/node/request_aggregator.hpp>
|
||||
#include <nano/node/signatures.hpp>
|
||||
#include <nano/node/telemetry.hpp>
|
||||
#include <nano/node/vote_processor.hpp>
|
||||
#include <nano/node/wallet.hpp>
|
||||
#include <nano/node/write_database_queue.hpp>
|
||||
|
@ -47,7 +48,7 @@ namespace websocket
|
|||
}
|
||||
|
||||
class node;
|
||||
|
||||
class telemetry;
|
||||
class work_pool;
|
||||
class block_arrival_info final
|
||||
{
|
||||
|
@ -166,6 +167,7 @@ public:
|
|||
nano::ledger ledger;
|
||||
nano::signature_checker checker;
|
||||
nano::network network;
|
||||
nano::telemetry telemetry;
|
||||
nano::bootstrap_initiator bootstrap_initiator;
|
||||
nano::bootstrap_listener bootstrap;
|
||||
boost::filesystem::path application_path;
|
||||
|
|
377
nano/node/telemetry.cpp
Normal file
377
nano/node/telemetry.cpp
Normal file
|
@ -0,0 +1,377 @@
|
|||
#include <nano/lib/alarm.hpp>
|
||||
#include <nano/lib/worker.hpp>
|
||||
#include <nano/node/network.hpp>
|
||||
#include <nano/node/telemetry.hpp>
|
||||
#include <nano/node/transport/transport.hpp>
|
||||
#include <nano/secure/buffer.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include <future>
|
||||
#include <numeric>
|
||||
|
||||
std::chrono::milliseconds constexpr nano::telemetry_impl::cache_cutoff;
|
||||
|
||||
nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) :
|
||||
network (network_a),
|
||||
alarm (alarm_a),
|
||||
worker (worker_a),
|
||||
batch_request (std::make_shared<nano::telemetry_impl> (network, alarm, worker))
|
||||
{
|
||||
}
|
||||
|
||||
void nano::telemetry::stop ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
batch_request = nullptr;
|
||||
single_requests.clear ();
|
||||
stopped = true;
|
||||
}
|
||||
|
||||
void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a)
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
if (!stopped)
|
||||
{
|
||||
batch_request->add (telemetry_data_a, endpoint_a);
|
||||
|
||||
for (auto & request : single_requests)
|
||||
{
|
||||
request.second.impl->add (telemetry_data_a, endpoint_a);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void nano::telemetry::get_metrics_random_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a)
|
||||
{
|
||||
// These peers will only be used if there isn't an already ongoing batch telemetry request round
|
||||
auto random_peers = network.random_set (network.size_sqrt (), network_params.protocol.telemetry_protocol_version_min);
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
if (!stopped && !random_peers.empty ())
|
||||
{
|
||||
batch_request->get_metrics_async (random_peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) {
|
||||
callback_a (telemetry_data_responses);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto all_received = false;
|
||||
callback_a (nano::telemetry_data_responses{ {}, false, all_received });
|
||||
}
|
||||
}
|
||||
|
||||
nano::telemetry_data_responses nano::telemetry::get_metrics_random_peers ()
|
||||
{
|
||||
std::promise<telemetry_data_responses> promise;
|
||||
get_metrics_random_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) {
|
||||
promise.set_value (telemetry_data_responses_a);
|
||||
});
|
||||
|
||||
return promise.get_future ().get ();
|
||||
}
|
||||
|
||||
// After a request is made to a single peer we want to remove it from the container after the peer has not been requested for a while (cache_cutoff).
|
||||
void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a)
|
||||
{
|
||||
// This class is just
|
||||
class ongoing_func_wrapper
|
||||
{
|
||||
public:
|
||||
std::function<void()> ongoing_func;
|
||||
};
|
||||
|
||||
auto wrapper = std::make_shared<ongoing_func_wrapper> ();
|
||||
// Keep calling ongoing_func while the peer is still being called
|
||||
const auto & last_updated = single_request_data_a.last_updated;
|
||||
wrapper->ongoing_func = [this, telemetry_impl_w = std::weak_ptr<nano::telemetry_impl> (single_request_data_a.impl), &last_updated, &endpoint_a, wrapper]() {
|
||||
if (auto telemetry_impl = telemetry_impl_w.lock ())
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (this->mutex);
|
||||
if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > last_updated && telemetry_impl->callbacks.empty ())
|
||||
{
|
||||
this->single_requests.erase (endpoint_a);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Request is still active, so call again
|
||||
this->alarm.add (std::chrono::steady_clock::now () + telemetry_impl->cache_cutoff, wrapper->ongoing_func);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, wrapper->ongoing_func);
|
||||
}
|
||||
|
||||
void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a)
|
||||
{
|
||||
auto telemetry_impl = single_request_data_a.impl;
|
||||
if (is_new_a)
|
||||
{
|
||||
// Clean this request up when it isn't being used anymore
|
||||
ongoing_single_request_cleanup (endpoint_a, single_request_data_a);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Ensure that refreshed flag is reset so we don't delete it before processing
|
||||
single_request_data_a.last_updated = std::chrono::steady_clock::now ();
|
||||
}
|
||||
}
|
||||
|
||||
void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::transport::channel> const & channel_a, std::function<void(telemetry_data_response const &)> const & callback_a)
|
||||
{
|
||||
auto invoke_callback_with_error = [&callback_a]() {
|
||||
auto const is_cached = false;
|
||||
auto const error = true;
|
||||
callback_a ({ nano::telemetry_data (), is_cached, error });
|
||||
};
|
||||
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
if (!stopped)
|
||||
{
|
||||
if (channel_a && (channel_a->get_network_version () >= network_params.protocol.telemetry_protocol_version_min))
|
||||
{
|
||||
auto pair = single_requests.emplace (channel_a->get_endpoint (), single_request_data{ std::make_shared<nano::telemetry_impl> (network, alarm, worker), std::chrono::steady_clock::now () });
|
||||
update_cleanup_data (pair.first->first, pair.first->second, pair.second);
|
||||
|
||||
pair.first->second.impl->get_metrics_async ({ channel_a }, [callback_a](telemetry_data_responses const & telemetry_data_responses_a) {
|
||||
// There should only be 1 response, so if this hasn't been received then conclude it is an error.
|
||||
auto const error = !telemetry_data_responses_a.all_received;
|
||||
if (!error)
|
||||
{
|
||||
assert (telemetry_data_responses_a.data.size () == 1);
|
||||
callback_a ({ telemetry_data_responses_a.data.front (), telemetry_data_responses_a.is_cached, error });
|
||||
}
|
||||
else
|
||||
{
|
||||
callback_a ({ nano::telemetry_data (), telemetry_data_responses_a.is_cached, error });
|
||||
}
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
invoke_callback_with_error ();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
invoke_callback_with_error ();
|
||||
}
|
||||
}
|
||||
|
||||
nano::telemetry_data_response nano::telemetry::get_metrics_single_peer (std::shared_ptr<nano::transport::channel> const & channel_a)
|
||||
{
|
||||
std::promise<telemetry_data_response> promise;
|
||||
get_metrics_single_peer_async (channel_a, [&promise](telemetry_data_response const & single_metric_data_a) {
|
||||
promise.set_value (single_metric_data_a);
|
||||
});
|
||||
|
||||
return promise.get_future ().get ();
|
||||
}
|
||||
|
||||
size_t nano::telemetry::telemetry_data_size ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
auto total = std::accumulate (single_requests.begin (), single_requests.end (), static_cast<size_t> (0), [](size_t total, auto & single_request) {
|
||||
return total += single_request.second.impl->telemetry_data_size ();
|
||||
});
|
||||
|
||||
if (batch_request)
|
||||
{
|
||||
total += batch_request->telemetry_data_size ();
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
nano::telemetry_impl::telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) :
|
||||
network (network_a),
|
||||
alarm (alarm_a),
|
||||
worker (worker_a)
|
||||
{
|
||||
}
|
||||
|
||||
void nano::telemetry_impl::flush_callbacks (nano::unique_lock<std::mutex> & lk_a, bool cached_a)
|
||||
{
|
||||
// Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added,
|
||||
// so check again and invoke those too
|
||||
assert (lk_a.owns_lock ());
|
||||
invoking = true;
|
||||
while (!callbacks.empty ())
|
||||
{
|
||||
lk_a.unlock ();
|
||||
invoke_callbacks (cached_a);
|
||||
lk_a.lock ();
|
||||
}
|
||||
invoking = false;
|
||||
}
|
||||
|
||||
void nano::telemetry_impl::get_metrics_async (std::unordered_set<std::shared_ptr<nano::transport::channel>> const & channels_a, std::function<void(telemetry_data_responses const &)> const & callback_a)
|
||||
{
|
||||
{
|
||||
assert (!channels_a.empty ());
|
||||
nano::unique_lock<std::mutex> lk (mutex);
|
||||
callbacks.push_back (callback_a);
|
||||
if (callbacks.size () > 1 || invoking)
|
||||
{
|
||||
// This means we already have at least one pending result already, so it will handle calls this callback when it completes
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if we can just return cached results
|
||||
if (std::chrono::steady_clock::now () < (last_time + cache_cutoff))
|
||||
{
|
||||
// Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise)
|
||||
worker.push_task ([this_w = std::weak_ptr<nano::telemetry_impl> (shared_from_this ())]() {
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
nano::unique_lock<std::mutex> lk (this_l->mutex);
|
||||
const auto is_cached = true;
|
||||
this_l->flush_callbacks (lk, is_cached);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
all_received = true;
|
||||
assert (required_responses.empty ());
|
||||
std::transform (channels_a.begin (), channels_a.end (), std::inserter (required_responses, required_responses.end ()), [](auto const & channel) {
|
||||
return channel->get_endpoint ();
|
||||
});
|
||||
}
|
||||
|
||||
fire_request_messages (channels_a);
|
||||
}
|
||||
|
||||
void nano::telemetry_impl::add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a)
|
||||
{
|
||||
nano::unique_lock<std::mutex> lk (mutex);
|
||||
if (required_responses.find (endpoint_a) == required_responses.cend ())
|
||||
{
|
||||
// Not requesting telemetry data from this channel so ignore it
|
||||
return;
|
||||
}
|
||||
|
||||
current_telemetry_data_responses.push_back (telemetry_data_a);
|
||||
channel_processed (lk, endpoint_a);
|
||||
}
|
||||
|
||||
void nano::telemetry_impl::invoke_callbacks (bool cached_a)
|
||||
{
|
||||
decltype (callbacks) callbacks_l;
|
||||
|
||||
{
|
||||
// Copy callbacks so that they can be called outside of holding the lock
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
callbacks_l = callbacks;
|
||||
current_telemetry_data_responses.clear ();
|
||||
callbacks.clear ();
|
||||
}
|
||||
for (auto & callback : callbacks_l)
|
||||
{
|
||||
callback ({ cached_telemetry_data, cached_a, all_received });
|
||||
}
|
||||
}
|
||||
|
||||
void nano::telemetry_impl::channel_processed (nano::unique_lock<std::mutex> & lk_a, nano::endpoint const & endpoint_a)
|
||||
{
|
||||
assert (lk_a.owns_lock ());
|
||||
auto num_removed = required_responses.erase (endpoint_a);
|
||||
if (num_removed > 0 && required_responses.empty ())
|
||||
{
|
||||
assert (lk_a.owns_lock ());
|
||||
cached_telemetry_data = current_telemetry_data_responses;
|
||||
|
||||
last_time = std::chrono::steady_clock::now ();
|
||||
auto const is_cached = false;
|
||||
flush_callbacks (lk_a, is_cached);
|
||||
}
|
||||
}
|
||||
|
||||
void nano::telemetry_impl::fire_request_messages (std::unordered_set<std::shared_ptr<nano::transport::channel>> const & channels)
|
||||
{
|
||||
uint64_t round_l;
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
++round;
|
||||
round_l = round;
|
||||
}
|
||||
|
||||
// Fire off a telemetry request to all passed in channels
|
||||
nano::telemetry_req message;
|
||||
for (auto & channel : channels)
|
||||
{
|
||||
assert (channel->get_network_version () >= network_params.protocol.telemetry_protocol_version_min);
|
||||
|
||||
std::weak_ptr<nano::telemetry_impl> this_w (shared_from_this ());
|
||||
channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) {
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
// Error sending the telemetry_req message
|
||||
nano::unique_lock<std::mutex> lk (this_l->mutex);
|
||||
this_l->all_received = false;
|
||||
this_l->channel_processed (lk, endpoint);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// If no response is seen after a certain period of time, remove it from the list of expected responses. However, only if it is part of the same round.
|
||||
alarm.add (std::chrono::steady_clock::now () + cache_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() {
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
nano::unique_lock<std::mutex> lk (this_l->mutex);
|
||||
if (this_l->round == round_l && this_l->required_responses.find (endpoint) != this_l->required_responses.cend ())
|
||||
{
|
||||
this_l->all_received = false;
|
||||
this_l->channel_processed (lk, endpoint);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
size_t nano::telemetry_impl::telemetry_data_size ()
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (mutex);
|
||||
return current_telemetry_data_responses.size ();
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::collect_container_info (telemetry & telemetry, const std::string & name)
|
||||
{
|
||||
size_t single_requests_count;
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (telemetry.mutex);
|
||||
single_requests_count = telemetry.single_requests.size ();
|
||||
}
|
||||
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
if (telemetry.batch_request)
|
||||
{
|
||||
composite->add_component (collect_container_info (*telemetry.batch_request, "batch_request"));
|
||||
}
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "single_requests", single_requests_count, sizeof (decltype (telemetry.single_requests)::value_type) }));
|
||||
return composite;
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::collect_container_info (telemetry_impl & telemetry_impl, const std::string & name)
|
||||
{
|
||||
size_t callback_count;
|
||||
size_t all_telemetry_data_count;
|
||||
size_t cached_telemetry_data_count;
|
||||
size_t required_responses_count;
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (telemetry_impl.mutex);
|
||||
callback_count = telemetry_impl.callbacks.size ();
|
||||
all_telemetry_data_count = telemetry_impl.current_telemetry_data_responses.size ();
|
||||
cached_telemetry_data_count = telemetry_impl.cached_telemetry_data.size ();
|
||||
required_responses_count = telemetry_impl.required_responses.size ();
|
||||
}
|
||||
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "callbacks", callback_count, sizeof (decltype (telemetry_impl.callbacks)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "current_telemetry_data_responses", all_telemetry_data_count, sizeof (decltype (telemetry_impl.current_telemetry_data_responses)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "cached_telemetry_data", cached_telemetry_data_count, sizeof (decltype (telemetry_impl.cached_telemetry_data)::value_type) }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "required_responses", required_responses_count, sizeof (decltype (telemetry_impl.required_responses)::value_type) }));
|
||||
return composite;
|
||||
}
|
165
nano/node/telemetry.hpp
Normal file
165
nano/node/telemetry.hpp
Normal file
|
@ -0,0 +1,165 @@
|
|||
#pragma once
|
||||
|
||||
#include <nano/node/common.hpp>
|
||||
#include <nano/secure/common.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class network;
|
||||
class alarm;
|
||||
class worker;
|
||||
class telemetry;
|
||||
namespace transport
|
||||
{
|
||||
class channel;
|
||||
}
|
||||
|
||||
/*
|
||||
* Holds a response from a telemetry request
|
||||
*/
|
||||
class telemetry_data_response
|
||||
{
|
||||
public:
|
||||
nano::telemetry_data data;
|
||||
bool is_cached;
|
||||
bool error;
|
||||
};
|
||||
|
||||
/*
|
||||
* Holds many responses from telemetry requests
|
||||
*/
|
||||
class telemetry_data_responses
|
||||
{
|
||||
public:
|
||||
std::vector<nano::telemetry_data> data;
|
||||
bool is_cached;
|
||||
bool all_received;
|
||||
};
|
||||
|
||||
/*
|
||||
* This class requests node telemetry metrics and invokes any callbacks
|
||||
* which have been aggregated. Further calls to get_metrics_async may return cached telemetry metrics
|
||||
* if they are within cache_cutoff time from the latest request.
|
||||
*/
|
||||
class telemetry_impl : public std::enable_shared_from_this<telemetry_impl>
|
||||
{
|
||||
public:
|
||||
telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a);
|
||||
|
||||
private:
|
||||
// Class only available to the telemetry class
|
||||
void get_metrics_async (std::unordered_set<std::shared_ptr<nano::transport::channel>> const & channels_a, std::function<void(telemetry_data_responses const &)> const & callback_a);
|
||||
void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a);
|
||||
size_t telemetry_data_size ();
|
||||
|
||||
nano::network_params network_params;
|
||||
// Anything older than this requires requesting metrics from other nodes
|
||||
static std::chrono::milliseconds constexpr cache_cutoff{ 3000 };
|
||||
|
||||
// All data in this chunk is protected by this mutex
|
||||
std::mutex mutex;
|
||||
std::vector<std::function<void(telemetry_data_responses const &)>> callbacks;
|
||||
std::chrono::steady_clock::time_point last_time = std::chrono::steady_clock::now () - cache_cutoff;
|
||||
/* The responses received during this request round */
|
||||
std::vector<nano::telemetry_data> current_telemetry_data_responses;
|
||||
/* The metrics for the last request round */
|
||||
std::vector<nano::telemetry_data> cached_telemetry_data;
|
||||
std::unordered_set<nano::endpoint> required_responses;
|
||||
uint64_t round{ 0 };
|
||||
/* Currently executing callbacks */
|
||||
bool invoking{ false };
|
||||
|
||||
std::atomic<bool> all_received{ true };
|
||||
|
||||
nano::network & network;
|
||||
nano::alarm & alarm;
|
||||
nano::worker & worker;
|
||||
|
||||
void invoke_callbacks (bool cached_a);
|
||||
void channel_processed (nano::unique_lock<std::mutex> & lk_a, nano::endpoint const & endpoint_a);
|
||||
void flush_callbacks (nano::unique_lock<std::mutex> & lk_a, bool cached_a);
|
||||
void fire_request_messages (std::unordered_set<std::shared_ptr<nano::transport::channel>> const & channels);
|
||||
|
||||
friend std::unique_ptr<container_info_component> collect_container_info (telemetry_impl &, const std::string &);
|
||||
friend nano::telemetry;
|
||||
friend class node_telemetry_single_request_Test;
|
||||
friend class node_telemetry_basic_Test;
|
||||
};
|
||||
|
||||
std::unique_ptr<nano::container_info_component> collect_container_info (telemetry_impl & telemetry_impl, const std::string & name);
|
||||
|
||||
class telemetry
|
||||
{
|
||||
public:
|
||||
telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a);
|
||||
|
||||
/*
|
||||
* Add telemetry metrics received from this endpoint.
|
||||
* Should this be unsolicited, it will not be added.
|
||||
*/
|
||||
void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a);
|
||||
|
||||
/*
|
||||
* Collects metrics from square root number of peers and invokes the callback when complete.
|
||||
*/
|
||||
void get_metrics_random_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a);
|
||||
|
||||
/*
|
||||
* A blocking version of get_metrics_random_peers_async ().
|
||||
*/
|
||||
telemetry_data_responses get_metrics_random_peers ();
|
||||
|
||||
/*
|
||||
* This makes a telemetry request to the specific channel
|
||||
*/
|
||||
void get_metrics_single_peer_async (std::shared_ptr<nano::transport::channel> const &, std::function<void(telemetry_data_response const &)> const & callback_a);
|
||||
|
||||
/*
|
||||
* A blocking version of get_metrics_single_peer_async
|
||||
*/
|
||||
telemetry_data_response get_metrics_single_peer (std::shared_ptr<nano::transport::channel> const &);
|
||||
|
||||
/*
|
||||
* Return the number of node metrics collected
|
||||
*/
|
||||
size_t telemetry_data_size ();
|
||||
|
||||
/*
|
||||
* Stop the telemetry processor
|
||||
*/
|
||||
void stop ();
|
||||
|
||||
private:
|
||||
nano::network & network;
|
||||
nano::alarm & alarm;
|
||||
nano::worker & worker;
|
||||
|
||||
nano::network_params network_params;
|
||||
|
||||
class single_request_data
|
||||
{
|
||||
public:
|
||||
std::shared_ptr<telemetry_impl> impl;
|
||||
std::chrono::steady_clock::time_point last_updated{ std::chrono::steady_clock::now () };
|
||||
};
|
||||
|
||||
std::mutex mutex;
|
||||
/* Requests telemetry data from a random selection of peers */
|
||||
std::shared_ptr<telemetry_impl> batch_request;
|
||||
/* Any requests to specific individual peers is maintained here */
|
||||
std::unordered_map<nano::endpoint, single_request_data> single_requests;
|
||||
bool stopped{ false };
|
||||
|
||||
void update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a);
|
||||
void ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a);
|
||||
|
||||
friend class node_telemetry_multiple_single_request_clearing_Test;
|
||||
friend std::unique_ptr<container_info_component> collect_container_info (telemetry &, const std::string &);
|
||||
};
|
||||
|
||||
std::unique_ptr<nano::container_info_component> collect_container_info (telemetry & telemetry, const std::string & name);
|
||||
}
|
|
@ -152,7 +152,7 @@ std::shared_ptr<nano::transport::channel_tcp> nano::transport::tcp_channels::fin
|
|||
return result;
|
||||
}
|
||||
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::random_set (size_t count_a) const
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::random_set (size_t count_a, uint8_t min_version) const
|
||||
{
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> result;
|
||||
result.reserve (count_a);
|
||||
|
@ -167,7 +167,12 @@ std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::t
|
|||
for (auto i (0); i < random_cutoff && result.size () < count_a; ++i)
|
||||
{
|
||||
auto index (nano::random_pool::generate_word32 (0, static_cast<CryptoPP::word32> (peers_size - 1)));
|
||||
result.insert (channels.get<random_access_tag> ()[index].channel);
|
||||
|
||||
auto channel = channels.get<random_access_tag> ()[index].channel;
|
||||
if (channel->get_network_version () >= min_version && !channel->server)
|
||||
{
|
||||
result.insert (channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
|
|
@ -84,7 +84,7 @@ namespace transport
|
|||
size_t size () const;
|
||||
std::shared_ptr<nano::transport::channel_tcp> find_channel (nano::tcp_endpoint const &) const;
|
||||
void random_fill (std::array<nano::endpoint, 8> &) const;
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t) const;
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0) const;
|
||||
bool store_all (bool = true);
|
||||
std::shared_ptr<nano::transport::channel_tcp> find_node_id (nano::account const &);
|
||||
// Get the next peer for attempting a tcp connection
|
||||
|
|
|
@ -47,6 +47,14 @@ public:
|
|||
{
|
||||
result = nano::stat::detail::node_id_handshake;
|
||||
}
|
||||
void telemetry_req (nano::telemetry_req const & message_a) override
|
||||
{
|
||||
result = nano::stat::detail::telemetry_req;
|
||||
}
|
||||
void telemetry_ack (nano::telemetry_ack const & message_a) override
|
||||
{
|
||||
result = nano::stat::detail::telemetry_ack;
|
||||
}
|
||||
nano::stat::detail result;
|
||||
};
|
||||
}
|
||||
|
|
|
@ -147,7 +147,7 @@ std::shared_ptr<nano::transport::channel_udp> nano::transport::udp_channels::cha
|
|||
return result;
|
||||
}
|
||||
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::udp_channels::random_set (size_t count_a) const
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::udp_channels::random_set (size_t count_a, uint8_t min_version) const
|
||||
{
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> result;
|
||||
result.reserve (count_a);
|
||||
|
@ -162,7 +162,11 @@ std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::u
|
|||
for (auto i (0); i < random_cutoff && result.size () < count_a; ++i)
|
||||
{
|
||||
auto index (nano::random_pool::generate_word32 (0, static_cast<CryptoPP::word32> (peers_size - 1)));
|
||||
result.insert (channels.get<random_access_tag> ()[index].channel);
|
||||
auto channel = channels.get<random_access_tag> ()[index].channel;
|
||||
if (channel->get_network_version () >= min_version)
|
||||
{
|
||||
result.insert (channel);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
@ -428,6 +432,14 @@ public:
|
|||
{
|
||||
assert (false);
|
||||
}
|
||||
void telemetry_req (nano::telemetry_req const & message_a) override
|
||||
{
|
||||
message (message_a);
|
||||
}
|
||||
void telemetry_ack (nano::telemetry_ack const & message_a) override
|
||||
{
|
||||
message (message_a);
|
||||
}
|
||||
void node_id_handshake (nano::node_id_handshake const & message_a) override
|
||||
{
|
||||
if (node.config.logging.network_node_id_handshake_logging ())
|
||||
|
@ -552,6 +564,12 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_
|
|||
case nano::message_parser::parse_status::invalid_node_id_handshake_message:
|
||||
node.stats.inc (nano::stat::type::udp, nano::stat::detail::invalid_node_id_handshake_message);
|
||||
break;
|
||||
case nano::message_parser::parse_status::invalid_telemetry_req_message:
|
||||
node.stats.inc (nano::stat::type::udp, nano::stat::detail::invalid_telemetry_req_message);
|
||||
break;
|
||||
case nano::message_parser::parse_status::invalid_telemetry_ack_message:
|
||||
node.stats.inc (nano::stat::type::udp, nano::stat::detail::invalid_telemetry_ack_message);
|
||||
break;
|
||||
case nano::message_parser::parse_status::outdated_version:
|
||||
node.stats.inc (nano::stat::type::udp, nano::stat::detail::outdated_version);
|
||||
break;
|
||||
|
|
|
@ -69,7 +69,7 @@ namespace transport
|
|||
size_t size () const;
|
||||
std::shared_ptr<nano::transport::channel_udp> channel (nano::endpoint const &) const;
|
||||
void random_fill (std::array<nano::endpoint, 8> &) const;
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t) const;
|
||||
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0) const;
|
||||
bool store_all (bool = true);
|
||||
std::shared_ptr<nano::transport::channel_udp> find_node_id (nano::account const &);
|
||||
void clean_node_id (nano::account const &);
|
||||
|
|
|
@ -7831,3 +7831,201 @@ TEST (rpc, receive_work_disabled)
|
|||
ASSERT_EQ (std::error_code (nano::error_common::disabled_work_generation).message (), response.json.get<std::string> ("error"));
|
||||
}
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
void compare_default_test_result_data (test_response & response, nano::node const & node_server_a)
|
||||
{
|
||||
ASSERT_EQ (200, response.status);
|
||||
ASSERT_FALSE (response.json.get<bool> ("cached"));
|
||||
ASSERT_EQ (1, response.json.get<uint64_t> ("block_count"));
|
||||
ASSERT_EQ (1, response.json.get<uint64_t> ("cemented_count"));
|
||||
ASSERT_EQ (0, response.json.get<uint64_t> ("unchecked_count"));
|
||||
ASSERT_EQ (1, response.json.get<uint64_t> ("account_count"));
|
||||
ASSERT_EQ (node_server_a.config.bandwidth_limit, response.json.get<uint64_t> ("bandwidth_cap"));
|
||||
ASSERT_EQ (1, response.json.get<uint32_t> ("peer_count"));
|
||||
ASSERT_EQ (node_server_a.network_params.protocol.protocol_version, response.json.get<uint8_t> ("protocol_version_number"));
|
||||
ASSERT_EQ (nano::get_major_node_version (), response.json.get<uint8_t> ("vendor_version"));
|
||||
ASSERT_GE (100, response.json.get<uint64_t> ("uptime"));
|
||||
ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get<std::string> ("genesis_block"));
|
||||
}
|
||||
}
|
||||
|
||||
TEST (rpc, node_telemetry_single)
|
||||
{
|
||||
nano::system system (1);
|
||||
auto & node1 = *add_ipc_enabled_node (system);
|
||||
scoped_io_thread_name_change scoped_thread_name_io;
|
||||
nano::node_rpc_config node_rpc_config;
|
||||
nano::ipc::ipc_server ipc_server (node1, node_rpc_config);
|
||||
nano::rpc_config rpc_config (nano::get_available_port (), true);
|
||||
rpc_config.rpc_process.ipc_port = node1.config.ipc_config.transport_tcp.port;
|
||||
nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config);
|
||||
nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor);
|
||||
rpc.start ();
|
||||
|
||||
// Wait until peers are stored as they are done in the background
|
||||
auto peers_stored = false;
|
||||
while (!peers_stored)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
|
||||
auto transaction = system.nodes.back ()->store.tx_begin_read ();
|
||||
peers_stored = system.nodes.back ()->store.peer_count (transaction) != 0;
|
||||
}
|
||||
|
||||
// Missing port
|
||||
boost::property_tree::ptree request;
|
||||
auto node = system.nodes.front ();
|
||||
request.put ("action", "node_telemetry");
|
||||
request.put ("address", "not_a_valid_address");
|
||||
|
||||
{
|
||||
test_response response (request, rpc.config.port, system.io_ctx);
|
||||
system.deadline_set (10s);
|
||||
while (response.status == 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (200, response.status);
|
||||
ASSERT_EQ (std::error_code (nano::error_rpc::requires_port_and_address).message (), response.json.get<std::string> ("error"));
|
||||
}
|
||||
|
||||
// Missing address
|
||||
request.erase ("address");
|
||||
request.put ("port", 65);
|
||||
|
||||
{
|
||||
test_response response (request, rpc.config.port, system.io_ctx);
|
||||
system.deadline_set (10s);
|
||||
while (response.status == 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (200, response.status);
|
||||
ASSERT_EQ (std::error_code (nano::error_rpc::requires_port_and_address).message (), response.json.get<std::string> ("error"));
|
||||
}
|
||||
|
||||
// Try with invalid address
|
||||
request.put ("address", "not_a_valid_address");
|
||||
request.put ("port", 65);
|
||||
|
||||
{
|
||||
test_response response (request, rpc.config.port, system.io_ctx);
|
||||
system.deadline_set (10s);
|
||||
while (response.status == 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (200, response.status);
|
||||
ASSERT_EQ (std::error_code (nano::error_common::invalid_ip_address).message (), response.json.get<std::string> ("error"));
|
||||
}
|
||||
|
||||
// Then invalid port
|
||||
request.put ("address", (boost::format ("%1%") % node->network.endpoint ().address ()).str ());
|
||||
request.put ("port", "invalid port");
|
||||
{
|
||||
test_response response (request, rpc.config.port, system.io_ctx);
|
||||
system.deadline_set (10s);
|
||||
while (response.status == 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (200, response.status);
|
||||
ASSERT_EQ (std::error_code (nano::error_common::invalid_port).message (), response.json.get<std::string> ("error"));
|
||||
}
|
||||
|
||||
// Use correctly formed address and port
|
||||
request.put ("port", node->network.endpoint ().port ());
|
||||
{
|
||||
test_response response (request, rpc.config.port, system.io_ctx);
|
||||
system.deadline_set (10s);
|
||||
while (response.status == 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (200, response.status);
|
||||
compare_default_test_result_data (response, *node);
|
||||
}
|
||||
}
|
||||
|
||||
TEST (rpc, node_telemetry_random)
|
||||
{
|
||||
nano::system system (1);
|
||||
auto & node1 = *add_ipc_enabled_node (system);
|
||||
scoped_io_thread_name_change scoped_thread_name_io;
|
||||
nano::node_rpc_config node_rpc_config;
|
||||
nano::ipc::ipc_server ipc_server (node1, node_rpc_config);
|
||||
nano::rpc_config rpc_config (nano::get_available_port (), true);
|
||||
rpc_config.rpc_process.ipc_port = node1.config.ipc_config.transport_tcp.port;
|
||||
nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config);
|
||||
nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor);
|
||||
rpc.start ();
|
||||
|
||||
// Wait until peers are stored as they are done in the background
|
||||
auto peers_stored = false;
|
||||
while (!peers_stored)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
|
||||
auto transaction = system.nodes.back ()->store.tx_begin_read ();
|
||||
peers_stored = system.nodes.back ()->store.peer_count (transaction) != 0;
|
||||
}
|
||||
|
||||
boost::property_tree::ptree request;
|
||||
auto node = system.nodes.front ();
|
||||
request.put ("action", "node_telemetry");
|
||||
{
|
||||
test_response response (request, rpc.config.port, system.io_ctx);
|
||||
system.deadline_set (10s);
|
||||
while (response.status == 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (200, response.status);
|
||||
ASSERT_FALSE (response.json.get<bool> ("cached"));
|
||||
ASSERT_EQ (1, response.json.get<uint64_t> ("block_count"));
|
||||
ASSERT_EQ (1, response.json.get<uint64_t> ("cemented_count"));
|
||||
ASSERT_EQ (0, response.json.get<uint64_t> ("unchecked_count"));
|
||||
ASSERT_EQ (1, response.json.get<uint64_t> ("account_count"));
|
||||
ASSERT_EQ (node->config.bandwidth_limit, response.json.get<uint64_t> ("bandwidth_cap"));
|
||||
ASSERT_EQ (1, response.json.get<uint32_t> ("peer_count"));
|
||||
ASSERT_EQ (node->network_params.protocol.protocol_version, response.json.get<uint8_t> ("protocol_version_number"));
|
||||
ASSERT_EQ (nano::get_major_node_version (), response.json.get<uint8_t> ("vendor_version"));
|
||||
ASSERT_GE (100, response.json.get<uint64_t> ("uptime"));
|
||||
ASSERT_EQ (nano::genesis ().hash ().to_string (), response.json.get<std::string> ("genesis_block"));
|
||||
}
|
||||
|
||||
request.put ("raw", "true");
|
||||
test_response response (request, rpc.config.port, system.io_ctx);
|
||||
system.deadline_set (10s);
|
||||
while (response.status == 0)
|
||||
{
|
||||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
ASSERT_EQ (200, response.status);
|
||||
|
||||
// This may fail if the response has taken longer than the cache cutoff time.
|
||||
ASSERT_TRUE (response.json.get<bool> ("cached"));
|
||||
|
||||
auto & all_metrics = response.json.get_child ("metrics");
|
||||
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint32_t, uint8_t, uint8_t, uint64_t, std::string>> raw_metrics_json_l;
|
||||
for (auto & metrics_pair : all_metrics)
|
||||
{
|
||||
auto & metrics = metrics_pair.second;
|
||||
raw_metrics_json_l.emplace_back (metrics.get<uint64_t> ("block_count"), metrics.get<uint64_t> ("cemented_count"), metrics.get<uint64_t> ("unchecked_count"), metrics.get<uint64_t> ("account_count"), metrics.get<uint64_t> ("bandwidth_cap"), metrics.get<uint64_t> ("peer_count"), metrics.get<uint8_t> ("protocol_version_number"), metrics.get<uint8_t> ("vendor_version"), metrics.get<uint64_t> ("uptime"), metrics.get<std::string> ("genesis_block"));
|
||||
}
|
||||
|
||||
ASSERT_EQ (1, raw_metrics_json_l.size ());
|
||||
auto const & metrics = raw_metrics_json_l.front ();
|
||||
ASSERT_EQ (1, std::get<0> (metrics));
|
||||
ASSERT_EQ (1, std::get<1> (metrics));
|
||||
ASSERT_EQ (0, std::get<2> (metrics));
|
||||
ASSERT_EQ (1, std::get<3> (metrics));
|
||||
ASSERT_EQ (node->config.bandwidth_limit, std::get<4> (metrics));
|
||||
ASSERT_EQ (1, std::get<5> (metrics));
|
||||
ASSERT_EQ (node->network_params.protocol.protocol_version, std::get<6> (metrics));
|
||||
ASSERT_EQ (nano::get_major_node_version (), std::get<7> (metrics));
|
||||
ASSERT_GE (100, std::get<8> (metrics));
|
||||
ASSERT_EQ (nano::genesis ().hash ().to_string (), std::get<9> (metrics));
|
||||
}
|
||||
|
|
|
@ -161,7 +161,6 @@ nano::keypair const & nano::test_genesis_key (test_constants.test_genesis_key);
|
|||
nano::account const & nano::nano_test_account (test_constants.nano_test_account);
|
||||
std::string const & nano::nano_test_genesis (test_constants.nano_test_genesis);
|
||||
nano::account const & nano::genesis_account (test_constants.genesis_account);
|
||||
std::string const & nano::genesis_block (test_constants.genesis_block);
|
||||
nano::uint128_t const & nano::genesis_amount (test_constants.genesis_amount);
|
||||
nano::account const & nano::burn_account (test_constants.burn_account);
|
||||
|
||||
|
|
|
@ -347,7 +347,7 @@ public:
|
|||
protocol_constants (nano::nano_networks network_a);
|
||||
|
||||
/** Current protocol version */
|
||||
uint8_t protocol_version = 0x11;
|
||||
uint8_t protocol_version = 0x12;
|
||||
|
||||
/** Minimum accepted protocol version */
|
||||
uint8_t protocol_version_min = 0x10;
|
||||
|
@ -360,6 +360,9 @@ public:
|
|||
|
||||
/** Do not start TCP realtime network connections to nodes older than this version */
|
||||
uint8_t tcp_realtime_protocol_version_min = 0x11;
|
||||
|
||||
/** Do not request telemetry metrics to nodes older than this version */
|
||||
uint8_t telemetry_protocol_version_min = 0x12;
|
||||
};
|
||||
|
||||
/** Genesis keys and ledger constants for network variants */
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue