Remove peers with different genesis block or invalid telemetry signature (#2603)
* Remove peers with different genesis block after tcp node handshake * Formatting * Handle case of valid signature but not matching node_id * Add UDP channel removal too * Check node_id for mismatch with channel to save sig check (Serg review) * Update from merge * Move erase code to network class
This commit is contained in:
parent
d4cf6f23e6
commit
e6a2f73c2f
15 changed files with 234 additions and 50 deletions
|
@ -886,6 +886,7 @@ TEST (network, replace_port)
|
|||
nano::node_flags node_flags;
|
||||
node_flags.disable_udp = false;
|
||||
node_flags.disable_ongoing_telemetry_requests = true;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
auto node0 = system.add_node (node_flags);
|
||||
ASSERT_EQ (0, node0->network.size ());
|
||||
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
|
||||
|
|
|
@ -208,8 +208,11 @@ TEST (node_telemetry, no_peers)
|
|||
TEST (node_telemetry, basic)
|
||||
{
|
||||
nano::system system;
|
||||
auto node_client = system.add_node ();
|
||||
auto node_server = system.add_node ();
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_ongoing_telemetry_requests = true;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
auto node_client = system.add_node (node_flags);
|
||||
auto node_server = system.add_node (node_flags);
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
|
@ -272,10 +275,11 @@ TEST (node_telemetry, basic)
|
|||
TEST (node_telemetry, many_nodes)
|
||||
{
|
||||
nano::system system;
|
||||
// The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number.
|
||||
const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_ongoing_telemetry_requests = true;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
// The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number.
|
||||
const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10;
|
||||
for (auto i = 0; i < num_nodes; ++i)
|
||||
{
|
||||
nano::node_config node_config (nano::get_available_port (), system.logging);
|
||||
|
@ -473,10 +477,11 @@ TEST (node_telemetry, blocking_request)
|
|||
|
||||
TEST (node_telemetry, disconnects)
|
||||
{
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
auto node_client = system.add_node (node_flags);
|
||||
auto node_server = system.add_node (node_flags);
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
|
@ -503,6 +508,7 @@ TEST (node_telemetry, all_peers_use_single_request_cache)
|
|||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_ongoing_telemetry_requests = true;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
auto node_client = system.add_node (node_flags);
|
||||
auto node_server = system.add_node (node_flags);
|
||||
|
||||
|
@ -571,10 +577,12 @@ TEST (node_telemetry, all_peers_use_single_request_cache)
|
|||
TEST (node_telemetry, dos_tcp)
|
||||
{
|
||||
// Confirm that telemetry_reqs are not processed
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
node_flags.disable_ongoing_telemetry_requests = true;
|
||||
auto node_client = system.add_node (node_flags);
|
||||
auto node_server = system.add_node (node_flags);
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
|
@ -618,15 +626,19 @@ TEST (node_telemetry, dos_tcp)
|
|||
TEST (node_telemetry, dos_udp)
|
||||
{
|
||||
// Confirm that telemetry_reqs are not processed
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_udp = false;
|
||||
node_flags.disable_tcp_realtime = true;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
node_flags.disable_ongoing_telemetry_requests = true;
|
||||
auto node_client = system.add_node (node_flags);
|
||||
auto node_server = system.add_node (node_flags);
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
nano::telemetry_req message;
|
||||
auto channel (node_server->network.udp_channels.create (node_server->network.endpoint ()));
|
||||
auto channel (node_client->network.udp_channels.create (node_server->network.endpoint ()));
|
||||
channel->send (message, [](boost::system::error_code const & ec, size_t size_a) {
|
||||
ASSERT_FALSE (ec);
|
||||
});
|
||||
|
@ -665,9 +677,10 @@ TEST (node_telemetry, dos_udp)
|
|||
|
||||
TEST (node_telemetry, disable_metrics)
|
||||
{
|
||||
nano::system system (1);
|
||||
auto node_client = system.nodes.front ();
|
||||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
auto node_client = system.add_node (node_flags);
|
||||
node_flags.disable_providing_telemetry_metrics = true;
|
||||
auto node_server = system.add_node (node_flags);
|
||||
|
||||
|
@ -704,3 +717,76 @@ TEST (node_telemetry, disable_metrics)
|
|||
ASSERT_NO_ERROR (system.poll ());
|
||||
}
|
||||
}
|
||||
|
||||
TEST (node_telemetry, remove_peer_different_genesis)
|
||||
{
|
||||
nano::system system (1);
|
||||
auto node0 (system.nodes[0]);
|
||||
ASSERT_EQ (0, node0->network.size ());
|
||||
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
|
||||
// Change genesis block to something else in this test (this is the reference telemetry processing uses).
|
||||
// Possible TSAN issue in the future if something else uses this, but will only appear in tests.
|
||||
node1->network_params.ledger.genesis_hash = nano::block_hash ("0");
|
||||
node1->start ();
|
||||
system.nodes.push_back (node1);
|
||||
node0->network.merge_peer (node1->network.endpoint ());
|
||||
node1->network.merge_peer (node0->network.endpoint ());
|
||||
ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::telemetry, nano::stat::detail::different_genesis_hash) != 0 && node1->stats.count (nano::stat::type::telemetry, nano::stat::detail::different_genesis_hash) != 0);
|
||||
|
||||
ASSERT_EQ (0, node0->network.size ());
|
||||
ASSERT_EQ (0, node1->network.size ());
|
||||
|
||||
ASSERT_EQ (node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out), 1);
|
||||
ASSERT_EQ (node1->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out), 1);
|
||||
}
|
||||
|
||||
TEST (node_telemetry, remove_peer_different_genesis_udp)
|
||||
{
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_udp = false;
|
||||
node_flags.disable_tcp_realtime = true;
|
||||
nano::system system (1, nano::transport::transport_type::udp, node_flags);
|
||||
auto node0 (system.nodes[0]);
|
||||
ASSERT_EQ (0, node0->network.size ());
|
||||
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
|
||||
node1->network_params.ledger.genesis_hash = nano::block_hash ("0");
|
||||
node1->start ();
|
||||
system.nodes.push_back (node1);
|
||||
node0->network.send_keepalive (std::make_shared<nano::transport::channel_udp> (node0->network.udp_channels, node1->network.endpoint (), node1->network_params.protocol.protocol_version));
|
||||
node1->network.send_keepalive (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, node0->network.endpoint (), node0->network_params.protocol.protocol_version));
|
||||
|
||||
ASSERT_TIMELY (10s, node0->network.udp_channels.size () != 0 && node1->network.udp_channels.size () != 0);
|
||||
ASSERT_EQ (node0->network.tcp_channels.size (), 0);
|
||||
ASSERT_EQ (node1->network.tcp_channels.size (), 0);
|
||||
|
||||
ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::telemetry, nano::stat::detail::different_genesis_hash) != 0 && node1->stats.count (nano::stat::type::telemetry, nano::stat::detail::different_genesis_hash) != 0);
|
||||
|
||||
ASSERT_EQ (0, node0->network.size ());
|
||||
ASSERT_EQ (0, node1->network.size ());
|
||||
}
|
||||
|
||||
namespace nano
|
||||
{
|
||||
TEST (node_telemetry, remove_peer_invalid_signature)
|
||||
{
|
||||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_udp = false;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
node_flags.disable_ongoing_telemetry_requests = true;
|
||||
auto node = system.add_node (node_flags);
|
||||
|
||||
auto channel = node->network.udp_channels.create (node->network.endpoint ());
|
||||
channel->set_node_id (node->node_id.pub);
|
||||
// (Implementation detail) So that messages are not just discarded when requests were not sent.
|
||||
node->telemetry->recent_or_initial_request_telemetry_data.emplace (channel->get_endpoint (), nano::telemetry_data (), std::chrono::steady_clock::now (), true);
|
||||
|
||||
auto telemetry_data = nano::local_telemetry_data (node->ledger.cache, node->network, node->config.bandwidth_limit, node->network_params, node->startup_time, node->node_id);
|
||||
// Change anything so that the signed message is incorrect
|
||||
telemetry_data.block_count = 0;
|
||||
auto telemetry_ack = nano::telemetry_ack (telemetry_data);
|
||||
node->network.process_message (telemetry_ack, channel);
|
||||
|
||||
ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0);
|
||||
}
|
||||
}
|
|
@ -863,6 +863,7 @@ TEST (websocket, telemetry)
|
|||
config.websocket_config.enabled = true;
|
||||
config.websocket_config.port = nano::get_available_port ();
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
node_flags.disable_ongoing_telemetry_requests = true;
|
||||
auto node1 (system.add_node (config, node_flags));
|
||||
config.peering_port = nano::get_available_port ();
|
||||
|
|
|
@ -447,6 +447,9 @@ std::string nano::stat::type_to_string (uint32_t key)
|
|||
case nano::stat::type::filter:
|
||||
res = "filter";
|
||||
break;
|
||||
case nano::stat::type::telemetry:
|
||||
res = "telemetry";
|
||||
break;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -700,6 +703,15 @@ std::string nano::stat::detail_to_string (uint32_t key)
|
|||
case nano::stat::detail::duplicate_publish:
|
||||
res = "duplicate_publish";
|
||||
break;
|
||||
case nano::stat::detail::different_genesis_hash:
|
||||
res = "different_genesis_hash";
|
||||
break;
|
||||
case nano::stat::detail::invalid_signature:
|
||||
res = "invalid_signature";
|
||||
break;
|
||||
case nano::stat::detail::node_id_mismatch:
|
||||
res = "node_id_mismatch";
|
||||
break;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -201,6 +201,7 @@ public:
|
|||
aggregator,
|
||||
requests,
|
||||
filter,
|
||||
telemetry
|
||||
};
|
||||
|
||||
/** Optional detail type */
|
||||
|
@ -318,7 +319,12 @@ public:
|
|||
requests_unknown,
|
||||
|
||||
// duplicate
|
||||
duplicate_publish
|
||||
duplicate_publish,
|
||||
|
||||
// telemetry
|
||||
invalid_signature,
|
||||
different_genesis_hash,
|
||||
node_id_mismatch
|
||||
};
|
||||
|
||||
/** Direction of the stat. If the direction is irrelevant, use in */
|
||||
|
|
|
@ -733,6 +733,19 @@ bool nano::network::empty () const
|
|||
return size () == 0;
|
||||
}
|
||||
|
||||
void nano::network::erase (nano::transport::channel const & channel_a)
|
||||
{
|
||||
if (channel_a.get_type () == nano::transport::transport_type::tcp)
|
||||
{
|
||||
tcp_channels.erase (channel_a.get_tcp_endpoint ());
|
||||
}
|
||||
else
|
||||
{
|
||||
udp_channels.erase (channel_a.get_endpoint ());
|
||||
udp_channels.clean_node_id (channel_a.get_node_id ());
|
||||
}
|
||||
}
|
||||
|
||||
nano::message_buffer_manager::message_buffer_manager (nano::stat & stats_a, size_t size, size_t count) :
|
||||
stats (stats_a),
|
||||
free (count),
|
||||
|
|
|
@ -149,6 +149,7 @@ public:
|
|||
size_t size () const;
|
||||
float size_sqrt () const;
|
||||
bool empty () const;
|
||||
void erase (nano::transport::channel const & channel_a);
|
||||
nano::message_buffer_manager buffer_container;
|
||||
boost::asio::ip::udp::resolver resolver;
|
||||
std::vector<boost::thread> packet_processing_threads;
|
||||
|
|
|
@ -106,7 +106,7 @@ gap_cache (*this),
|
|||
ledger (store, stats, flags_a.generate_cache),
|
||||
checker (config.signature_checker_threads),
|
||||
network (*this, config.peering_port),
|
||||
telemetry (std::make_shared<nano::telemetry> (network, alarm, worker, observers.telemetry, flags.disable_ongoing_telemetry_requests)),
|
||||
telemetry (std::make_shared<nano::telemetry> (network, alarm, worker, observers.telemetry, stats, network_params, flags.disable_ongoing_telemetry_requests)),
|
||||
bootstrap_initiator (*this),
|
||||
bootstrap (config.peering_port, *this),
|
||||
application_path (application_path_a),
|
||||
|
|
|
@ -129,9 +129,10 @@ public:
|
|||
bool disable_unchecked_cleanup{ false };
|
||||
bool disable_unchecked_drop{ true };
|
||||
bool disable_providing_telemetry_metrics{ false };
|
||||
bool disable_ongoing_telemetry_requests{ false };
|
||||
bool disable_initial_telemetry_requests{ false };
|
||||
bool disable_block_processor_unchecked_deletion{ false };
|
||||
bool disable_block_processor_republishing{ false };
|
||||
bool disable_ongoing_telemetry_requests{ false };
|
||||
bool allow_bootstrap_peers_duplicates{ false };
|
||||
bool disable_max_peers_per_ip{ false }; // For testing only
|
||||
bool fast_bootstrap{ false };
|
||||
|
|
|
@ -175,7 +175,7 @@ void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::chan
|
|||
});
|
||||
}
|
||||
|
||||
void nano::rep_crawler::query (std::shared_ptr<nano::transport::channel> & channel_a)
|
||||
void nano::rep_crawler::query (std::shared_ptr<nano::transport::channel> const & channel_a)
|
||||
{
|
||||
std::vector<std::shared_ptr<nano::transport::channel>> peers;
|
||||
peers.emplace_back (channel_a);
|
||||
|
|
|
@ -87,7 +87,7 @@ public:
|
|||
void query (std::vector<std::shared_ptr<nano::transport::channel>> const & channels_a);
|
||||
|
||||
/** Attempt to determine if the peer manages one or more representative accounts */
|
||||
void query (std::shared_ptr<nano::transport::channel> & channel_a);
|
||||
void query (std::shared_ptr<nano::transport::channel> const & channel_a);
|
||||
|
||||
/** Query if a peer manages a principle representative */
|
||||
bool is_pr (nano::transport::channel const &) const;
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
#include <nano/lib/alarm.hpp>
|
||||
#include <nano/lib/stats.hpp>
|
||||
#include <nano/lib/worker.hpp>
|
||||
#include <nano/node/network.hpp>
|
||||
#include <nano/node/nodeconfig.hpp>
|
||||
#include <nano/node/telemetry.hpp>
|
||||
#include <nano/node/transport/transport.hpp>
|
||||
#include <nano/secure/buffer.hpp>
|
||||
|
@ -15,11 +17,13 @@
|
|||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> & observers_a, bool disable_ongoing_requests_a) :
|
||||
nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> & observers_a, nano::stat & stats_a, nano::network_params & network_params_a, bool disable_ongoing_requests_a) :
|
||||
network (network_a),
|
||||
alarm (alarm_a),
|
||||
worker (worker_a),
|
||||
observers (observers_a),
|
||||
stats (stats_a),
|
||||
network_params (network_params_a),
|
||||
disable_ongoing_requests (disable_ongoing_requests_a)
|
||||
{
|
||||
}
|
||||
|
@ -56,24 +60,63 @@ void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transpor
|
|||
telemetry_info_a.undergoing_request = false;
|
||||
});
|
||||
|
||||
auto error = false;
|
||||
if (!message_a.is_empty_payload ())
|
||||
{
|
||||
error = !message_a.data.validate_signature (message_a.size ()) && (channel_a.get_node_id () != message_a.data.node_id);
|
||||
}
|
||||
// This can also remove the peer
|
||||
auto error = verify_message (message_a, channel_a);
|
||||
|
||||
if (!error && !message_a.is_empty_payload ())
|
||||
if (!error)
|
||||
{
|
||||
// Received telemetry data from a peer which hasn't disabled providing telemetry metrics and there no errors with the data
|
||||
lk.unlock ();
|
||||
observers.notify (message_a.data, endpoint);
|
||||
lk.lock ();
|
||||
}
|
||||
|
||||
channel_processed (endpoint, error || message_a.is_empty_payload ());
|
||||
channel_processed (endpoint, error);
|
||||
}
|
||||
}
|
||||
|
||||
bool nano::telemetry::verify_message (nano::telemetry_ack const & message_a, nano::transport::channel const & channel_a)
|
||||
{
|
||||
if (message_a.is_empty_payload ())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
auto remove_channel = false;
|
||||
// We want to ensure that the node_id of the channel matches that in the message before attempting to
|
||||
// use the data to remove any peers.
|
||||
auto node_id_mismatch = (channel_a.get_node_id () != message_a.data.node_id);
|
||||
if (!node_id_mismatch)
|
||||
{
|
||||
// The data could be correctly signed but for a different node id
|
||||
remove_channel = message_a.data.validate_signature (message_a.size ());
|
||||
if (!remove_channel)
|
||||
{
|
||||
// Check for different genesis blocks
|
||||
remove_channel = (message_a.data.genesis_block != network_params.ledger.genesis_hash);
|
||||
if (remove_channel)
|
||||
{
|
||||
stats.inc (nano::stat::type::telemetry, nano::stat::detail::different_genesis_hash);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
stats.inc (nano::stat::type::telemetry, nano::stat::detail::invalid_signature);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
stats.inc (nano::stat::type::telemetry, nano::stat::detail::node_id_mismatch);
|
||||
}
|
||||
|
||||
if (remove_channel)
|
||||
{
|
||||
// Disconnect from peer with incorrect telemetry data
|
||||
network.erase (channel_a);
|
||||
}
|
||||
|
||||
return remove_channel || node_id_mismatch;
|
||||
}
|
||||
|
||||
std::chrono::milliseconds nano::telemetry::cache_plus_buffer_cutoff_time () const
|
||||
{
|
||||
// This include the waiting time for the response as well as a buffer (1 second) waiting for the alarm operation to be scheduled and completed
|
||||
|
@ -291,14 +334,14 @@ nano::telemetry_data_response nano::telemetry::get_metrics_single_peer (std::sha
|
|||
return promise.get_future ().get ();
|
||||
}
|
||||
|
||||
void nano::telemetry::fire_request_message (std::shared_ptr<nano::transport::channel> const & channel)
|
||||
void nano::telemetry::fire_request_message (std::shared_ptr<nano::transport::channel> const & channel_a)
|
||||
{
|
||||
// Fire off a telemetry request to all passed in channels
|
||||
debug_assert (channel->get_network_version () >= network_params.protocol.telemetry_protocol_version_min);
|
||||
debug_assert (channel_a->get_network_version () >= network_params.protocol.telemetry_protocol_version_min);
|
||||
|
||||
uint64_t round_l;
|
||||
{
|
||||
auto it = recent_or_initial_request_telemetry_data.find (channel->get_endpoint ());
|
||||
auto it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ());
|
||||
recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) {
|
||||
++telemetry_info_a.round;
|
||||
});
|
||||
|
@ -308,7 +351,7 @@ void nano::telemetry::fire_request_message (std::shared_ptr<nano::transport::cha
|
|||
std::weak_ptr<nano::telemetry> this_w (shared_from_this ());
|
||||
nano::telemetry_req message;
|
||||
// clang-format off
|
||||
channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) {
|
||||
channel_a->send (message, [this_w, endpoint = channel_a->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) {
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
if (ec)
|
||||
|
@ -323,7 +366,7 @@ void nano::telemetry::fire_request_message (std::shared_ptr<nano::transport::cha
|
|||
// clang-format on
|
||||
|
||||
// If no response is seen after a certain period of time remove it
|
||||
alarm.add (std::chrono::steady_clock::now () + response_time_cutoff, [round_l, this_w, endpoint = channel->get_endpoint ()]() {
|
||||
alarm.add (std::chrono::steady_clock::now () + response_time_cutoff, [round_l, this_w, endpoint = channel_a->get_endpoint ()]() {
|
||||
if (auto this_l = this_w.lock ())
|
||||
{
|
||||
nano::lock_guard<std::mutex> guard (this_l->mutex);
|
||||
|
|
|
@ -19,6 +19,7 @@ namespace nano
|
|||
class network;
|
||||
class alarm;
|
||||
class worker;
|
||||
class stat;
|
||||
namespace transport
|
||||
{
|
||||
class channel;
|
||||
|
@ -59,7 +60,7 @@ public:
|
|||
class telemetry : public std::enable_shared_from_this<telemetry>
|
||||
{
|
||||
public:
|
||||
telemetry (nano::network &, nano::alarm &, nano::worker &, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> &, bool);
|
||||
telemetry (nano::network &, nano::alarm &, nano::worker &, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> &, nano::stat &, nano::network_params &, bool);
|
||||
void start ();
|
||||
void stop ();
|
||||
|
||||
|
@ -74,7 +75,8 @@ public:
|
|||
std::unordered_map<nano::endpoint, nano::telemetry_data> get_metrics ();
|
||||
|
||||
/*
|
||||
* This makes a telemetry request to the specific channel
|
||||
* This makes a telemetry request to the specific channel.
|
||||
* Error is set for: no response received, no payload received, invalid signature or unsound metrics in message (e.g different genesis block)
|
||||
*/
|
||||
void get_metrics_single_peer_async (std::shared_ptr<nano::transport::channel> const &, std::function<void(telemetry_data_response const &)> const &);
|
||||
|
||||
|
@ -105,10 +107,12 @@ private:
|
|||
nano::alarm & alarm;
|
||||
nano::worker & worker;
|
||||
nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> & observers;
|
||||
nano::stat & stats;
|
||||
/* Important that this is a reference to the node network_params for tests which want to modify genesis block */
|
||||
nano::network_params & network_params;
|
||||
bool disable_ongoing_requests;
|
||||
|
||||
std::atomic<bool> stopped{ false };
|
||||
nano::network_params network_params;
|
||||
bool disable_ongoing_requests;
|
||||
|
||||
std::mutex mutex;
|
||||
// clang-format off
|
||||
|
@ -131,14 +135,16 @@ private:
|
|||
|
||||
void ongoing_req_all_peers (std::chrono::milliseconds);
|
||||
|
||||
void fire_request_message (std::shared_ptr<nano::transport::channel> const & channel);
|
||||
void fire_request_message (std::shared_ptr<nano::transport::channel> const &);
|
||||
void channel_processed (nano::endpoint const &, bool);
|
||||
void flush_callbacks_async (nano::endpoint const &, bool);
|
||||
void invoke_callbacks (nano::endpoint const &, bool);
|
||||
|
||||
bool within_cache_cutoff (nano::telemetry_info const &) const;
|
||||
bool within_cache_plus_buffer_cutoff (telemetry_info const & telemetry_info) const;
|
||||
friend std::unique_ptr<nano::container_info_component> collect_container_info (telemetry & telemetry, const std::string & name);
|
||||
bool within_cache_plus_buffer_cutoff (telemetry_info const &) const;
|
||||
bool verify_message (nano::telemetry_ack const &, nano::transport::channel const &);
|
||||
friend std::unique_ptr<nano::container_info_component> collect_container_info (telemetry &, const std::string &);
|
||||
friend class node_telemetry_remove_peer_invalid_signature_Test;
|
||||
};
|
||||
|
||||
std::unique_ptr<nano::container_info_component> collect_container_info (telemetry & telemetry, const std::string & name);
|
||||
|
|
|
@ -649,6 +649,13 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
|
|||
response_server->remote_node_id = channel_a->get_node_id ();
|
||||
response_server->receive ();
|
||||
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
|
||||
|
||||
if (!node_l->flags.disable_initial_telemetry_requests)
|
||||
{
|
||||
node_l->telemetry->get_metrics_single_peer_async (channel_a, [](nano::telemetry_data_response /* unused */) {
|
||||
// Intentionally empty, starts the telemetry request cycle to more quickly disconnect from invalid peers
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
|
@ -872,10 +872,11 @@ void callback_process (shared_data & shared_data_a, data & data, T & all_node_da
|
|||
|
||||
TEST (node_telemetry, ongoing_requests)
|
||||
{
|
||||
nano::system system (2);
|
||||
|
||||
auto node_client = system.nodes.front ();
|
||||
auto node_server = system.nodes.back ();
|
||||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
auto node_client = system.add_node (node_flags);
|
||||
auto node_server = system.add_node (node_flags);
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
|
@ -911,8 +912,14 @@ namespace transport
|
|||
{
|
||||
TEST (node_telemetry, simultaneous_requests)
|
||||
{
|
||||
nano::system system;
|
||||
nano::node_flags node_flags;
|
||||
node_flags.disable_initial_telemetry_requests = true;
|
||||
const auto num_nodes = 4;
|
||||
nano::system system (num_nodes);
|
||||
for (int i = 0; i < num_nodes; ++i)
|
||||
{
|
||||
system.add_node (node_flags);
|
||||
}
|
||||
|
||||
wait_peer_connections (system);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue