Improve telemetry request/response under load (#2669)

* Improve telemetry request/response under load

* Fix broken merge

* Extend response time to 10s on beta/live

* Some minor cleanup and extra comments

* Formatting

* Add missed break statement in stats
This commit is contained in:
Wesley Shillingford 2020-04-09 20:59:27 +01:00 committed by GitHub
commit f7b21c2e8a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 183 additions and 63 deletions

View file

@ -15,41 +15,57 @@ TEST (socket, drop_policy)
auto node = inactivenode.node;
nano::thread_runner runner (node->io_ctx, 1);
auto server_port (nano::get_available_port ());
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), server_port);
auto server_socket (std::make_shared<nano::server_socket> (node, endpoint, 1, nano::socket::concurrency::multi_writer));
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
std::vector<std::shared_ptr<nano::socket>> connections;
// Accept connection, but don't read so the writer will drop.
server_socket->on_connection ([&connections](std::shared_ptr<nano::socket> new_connection, boost::system::error_code const & ec_a) {
connections.push_back (new_connection);
return true;
});
auto client (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
// We're going to write twice the queue size + 1, and the server isn't reading
// The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop)
auto total_message_count (client->get_max_write_queue_size () * 2 + 1);
nano::util::counted_completion write_completion (total_message_count - 1);
size_t max_write_queue_size = 0;
{
auto client_dummy (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
max_write_queue_size = client_dummy->get_max_write_queue_size ();
}
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_port),
[client, total_message_count, node, &write_completion](boost::system::error_code const & ec_a) {
for (int i = 0; i < total_message_count; i++)
{
std::vector<uint8_t> buff (1);
client->async_write (
nano::shared_const_buffer (std::move (buff)), [&write_completion](boost::system::error_code const & ec, size_t size_a) {
write_completion.increment ();
},
nano::buffer_drop_policy::no_socket_drop);
}
});
write_completion.await_count_for (std::chrono::seconds (5));
auto func = [&](size_t total_message_count, nano::buffer_drop_policy drop_policy) {
auto server_port (nano::get_available_port ());
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), server_port);
auto server_socket (std::make_shared<nano::server_socket> (node, endpoint, 1, nano::socket::concurrency::multi_writer));
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
// Accept connection, but don't read so the writer will drop.
server_socket->on_connection ([&connections](std::shared_ptr<nano::socket> new_connection, boost::system::error_code const & ec_a) {
connections.push_back (new_connection);
return true;
});
auto client (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
nano::util::counted_completion write_completion (total_message_count - 1);
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), server_port),
[client, total_message_count, node, &write_completion, &drop_policy](boost::system::error_code const & ec_a) {
for (int i = 0; i < total_message_count; i++)
{
std::vector<uint8_t> buff (1);
client->async_write (
nano::shared_const_buffer (std::move (buff)), [&write_completion](boost::system::error_code const & ec, size_t size_a) {
write_completion.increment ();
},
drop_policy);
}
});
write_completion.await_count_for (std::chrono::seconds (5));
};
func (max_write_queue_size * 2 + 1, nano::buffer_drop_policy::no_socket_drop);
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out));
ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out));
func (max_write_queue_size + 1, nano::buffer_drop_policy::limiter);
// The stats are accumulated from before
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out));
node->stop ();

View file

@ -628,8 +628,12 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::tcp_write_drop:
res = "tcp_write_drop";
break;
case nano::stat::detail::tcp_write_no_socket_drop:
res = "tcp_write_no_socket_drop";
break;
case nano::stat::detail::tcp_excluded:
res = "tcp_excluded";
break;
case nano::stat::detail::unreachable_host:
res = "unreachable_host";
break;
@ -714,6 +718,18 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::node_id_mismatch:
res = "node_id_mismatch";
break;
case nano::stat::detail::request_within_protection_cache_zone:
res = "request_within_protection_cache_zone";
break;
case nano::stat::detail::no_response_received:
res = "no_response_received";
break;
case nano::stat::detail::unsolicited_telemetry_ack:
res = "unsolicited_telemetry_ack";
break;
case nano::stat::detail::failed_send_telemetry_req:
res = "failed_send_telemetry_req";
break;
}
return res;
}

View file

@ -294,6 +294,7 @@ public:
tcp_accept_success,
tcp_accept_failure,
tcp_write_drop,
tcp_write_no_socket_drop,
tcp_excluded,
// ipc
@ -325,7 +326,11 @@ public:
// telemetry
invalid_signature,
different_genesis_hash,
node_id_mismatch
node_id_mismatch,
request_within_protection_cache_zone,
no_response_received,
unsolicited_telemetry_ack,
failed_send_telemetry_req
};
/** Direction of the stat. If the direction is irrelevant, use in */

View file

@ -258,6 +258,10 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co
last_telemetry_req = std::chrono::steady_clock::now ();
add_request (std::make_unique<nano::telemetry_req> (header));
}
else
{
node->stats.inc (nano::stat::type::telemetry, nano::stat::detail::request_within_protection_cache_zone);
}
}
receive ();
break;

View file

@ -88,7 +88,19 @@ void nano::socket::async_write (nano::shared_const_buffer const & buffer_a, std:
}
else if (auto node_l = this_l->node.lock ())
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
if (drop_policy_a == nano::buffer_drop_policy::no_socket_drop)
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out);
}
else
{
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out);
}
if (callback_a)
{
callback_a (boost::system::errc::make_error_code (boost::system::errc::no_buffer_space), 0);
}
}
if (!write_in_progress)
{

View file

@ -52,12 +52,12 @@ void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transpor
if (it == recent_or_initial_request_telemetry_data.cend () || !it->undergoing_request)
{
// Not requesting telemetry data from this peer so ignore it
stats.inc (nano::stat::type::telemetry, nano::stat::detail::unsolicited_telemetry_ack);
return;
}
recent_or_initial_request_telemetry_data.modify (it, [&message_a](nano::telemetry_info & telemetry_info_a) {
telemetry_info_a.data = message_a.data;
telemetry_info_a.undergoing_request = false;
});
// This can also remove the peer
@ -65,7 +65,7 @@ void nano::telemetry::set (nano::telemetry_ack const & message_a, nano::transpor
if (!error)
{
// Received telemetry data from a peer which hasn't disabled providing telemetry metrics and there no errors with the data
// Received telemetry data from a peer which hasn't disabled providing telemetry metrics and there's no errors with the data
lk.unlock ();
observers.notify (message_a.data, endpoint);
lk.lock ();
@ -163,16 +163,18 @@ void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_requ
}
};
// clang-format off
namespace mi = boost::multi_index;
boost::multi_index_container<channel_wrapper,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_endpoint>,
mi::const_mem_fun<channel_wrapper, nano::endpoint, &channel_wrapper::endpoint>>,
mi::hashed_unique<mi::tag<tag_channel>,
mi::member<channel_wrapper, std::shared_ptr<nano::transport::channel>, &channel_wrapper::channel>>>>
peers;
mi::hashed_unique<mi::tag<tag_endpoint>,
mi::const_mem_fun<channel_wrapper, nano::endpoint, &channel_wrapper::endpoint>>,
mi::hashed_unique<mi::tag<tag_channel>,
mi::member<channel_wrapper, std::shared_ptr<nano::transport::channel>, &channel_wrapper::channel>>>> peers;
// clang-format on
{
// Copy peers to the multi index container so can get better asymptotic complexity in future operations
auto temp_peers = this_l->network.list (std::numeric_limits<size_t>::max (), this_l->network_params.protocol.telemetry_protocol_version_min);
peers.insert (temp_peers.begin (), temp_peers.end ());
}
@ -215,23 +217,23 @@ void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_requ
});
}
// Schedule the next request; Use the default request time unless a telemetry request cache expires sooner
nano::lock_guard<std::mutex> guard (this_l->mutex);
long long next_round = std::chrono::duration_cast<std::chrono::milliseconds> (this_l->cache_cutoff + this_l->response_time_cutoff).count ();
if (!this_l->recent_or_initial_request_telemetry_data.empty ())
{
// Use the default request time unless a telemetry request cache expires sooner
// Find the closest time with doesn't
auto range = boost::make_iterator_range (this_l->recent_or_initial_request_telemetry_data.get<tag_last_updated> ());
for (auto i : range)
for (auto telemetry_info : range)
{
if (peers.count (i.endpoint) == 0)
if (!telemetry_info.undergoing_request && peers.count (telemetry_info.endpoint) == 0)
{
auto const last_response = i.last_response;
auto const last_response = telemetry_info.last_response;
auto now = std::chrono::steady_clock::now ();
if (now > last_response + this_l->cache_cutoff)
{
next_round = std::min<long long> (next_round, std::chrono::duration_cast<std::chrono::milliseconds> (now - (last_response + this_l->cache_cutoff)).count ());
}
// We are iterating in sorted order from last_updated, so can break once we have found the first valid one.
break;
}
}
@ -302,6 +304,7 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
{
if (it == recent_or_initial_request_telemetry_data.cend ())
{
// Insert dummy values, it's important not to use "last_response" time here without first checking that awaiting_first_response () returns false.
recent_or_initial_request_telemetry_data.emplace (channel_a->get_endpoint (), nano::telemetry_data (), std::chrono::steady_clock::now (), true);
it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ());
}
@ -354,32 +357,36 @@ void nano::telemetry::fire_request_message (std::shared_ptr<nano::transport::cha
std::weak_ptr<nano::telemetry> this_w (shared_from_this ());
nano::telemetry_req message;
// clang-format off
channel_a->send (message, [this_w, endpoint = channel_a->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) {
channel_a->send (message, [this_w, endpoint = channel_a->get_endpoint (), round_l](boost::system::error_code const & ec, size_t size_a) {
if (auto this_l = this_w.lock ())
{
if (ec)
{
// Error sending the telemetry_req message
this_l->stats.inc (nano::stat::type::telemetry, nano::stat::detail::failed_send_telemetry_req);
nano::lock_guard<std::mutex> guard (this_l->mutex);
this_l->channel_processed (endpoint, true);
}
else
{
// If no response is seen after a certain period of time remove it
this_l->alarm.add (std::chrono::steady_clock::now () + this_l->response_time_cutoff, [round_l, this_w, endpoint]() {
if (auto this_l = this_w.lock ())
{
nano::lock_guard<std::mutex> guard (this_l->mutex);
auto it = this_l->recent_or_initial_request_telemetry_data.find (endpoint);
if (it != this_l->recent_or_initial_request_telemetry_data.cend () && it->undergoing_request && round_l == it->round)
{
this_l->stats.inc (nano::stat::type::telemetry, nano::stat::detail::no_response_received);
this_l->channel_processed (endpoint, true);
}
}
});
}
}
},
nano::buffer_drop_policy::no_socket_drop);
// clang-format on
// If no response is seen after a certain period of time remove it
alarm.add (std::chrono::steady_clock::now () + response_time_cutoff, [round_l, this_w, endpoint = channel_a->get_endpoint ()]() {
if (auto this_l = this_w.lock ())
{
nano::lock_guard<std::mutex> guard (this_l->mutex);
auto it = this_l->recent_or_initial_request_telemetry_data.find (endpoint);
if (it != this_l->recent_or_initial_request_telemetry_data.cend () && it->undergoing_request && round_l == it->round)
{
this_l->channel_processed (endpoint, true);
}
}
});
}
void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool error_a)
@ -387,10 +394,14 @@ void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool
auto it = recent_or_initial_request_telemetry_data.find (endpoint_a);
if (it != recent_or_initial_request_telemetry_data.end ())
{
recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) {
telemetry_info_a.last_response = std::chrono::steady_clock::now ();
});
if (error_a)
if (!error_a)
{
recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) {
telemetry_info_a.last_response = std::chrono::steady_clock::now ();
telemetry_info_a.undergoing_request = false;
});
}
else
{
recent_or_initial_request_telemetry_data.erase (endpoint_a);
}

View file

@ -129,7 +129,7 @@ private:
std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) };
// The maximum time spent waiting for a response to a telemetry request
std::chrono::seconds const response_time_cutoff{ is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3 };
std::chrono::seconds const response_time_cutoff{ network_params.network.is_test_network () ? (is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3) : 10 };
std::unordered_map<nano::endpoint, std::vector<std::function<void(telemetry_data_response const &)>>> callbacks;

View file

@ -50,7 +50,7 @@ public:
nano::alarm alarm{ io_ctx };
std::vector<std::shared_ptr<nano::node>> nodes;
nano::logging logging;
nano::work_pool work{ 1 };
nano::work_pool work{ std::max (std::thread::hardware_concurrency (), 1u) };
std::chrono::time_point<std::chrono::steady_clock, std::chrono::duration<double>> deadline{ std::chrono::steady_clock::time_point::max () };
double deadline_scaling_factor{ 1.0 };
unsigned node_sequence{ 0 };

View file

@ -100,6 +100,11 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct
}
else
{
if (callback_a)
{
callback_a (boost::system::errc::make_error_code (boost::system::errc::not_supported), 0);
}
node.stats.inc (nano::stat::type::drop, detail, nano::stat::dir::out);
if (node.config.logging.network_packet_logging ())
{

View file

@ -1071,6 +1071,57 @@ namespace transport
}
}
TEST (node_telemetry, under_load)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node = system.add_node (node_config);
node_config.peering_port = nano::get_available_port ();
nano::node_flags node_flags;
node_flags.disable_ongoing_telemetry_requests = true;
auto node1 = system.add_node (node_config, node_flags);
nano::genesis genesis;
nano::keypair key;
nano::keypair key1;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
system.wallet (0)->insert_adhoc (key.prv);
auto latest_genesis = node->latest (nano::test_genesis_key.pub);
auto num_blocks = 150000;
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, latest_genesis, nano::test_genesis_key.pub, nano::genesis_amount - num_blocks, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest_genesis)));
node->process_active (send);
latest_genesis = send->hash ();
auto open (std::make_shared<nano::state_block> (key.pub, 0, key.pub, num_blocks, send->hash (), key.prv, key.pub, *system.work.generate (key.pub)));
node->process_active (open);
auto latest_key = open->hash ();
auto thread_func = [key1, &system, node, num_blocks](nano::keypair const & keypair, nano::block_hash const & latest, nano::uint128_t const initial_amount) {
auto latest_l = latest;
for (int i = 0; i < num_blocks; ++i)
{
auto send (std::make_shared<nano::state_block> (keypair.pub, latest_l, keypair.pub, initial_amount - i - 1, key1.pub, keypair.prv, keypair.pub, *system.work.generate (latest_l)));
latest_l = send->hash ();
node->process_active (send);
}
};
std::thread thread1 (thread_func, nano::test_genesis_key, latest_genesis, nano::genesis_amount - num_blocks);
std::thread thread2 (thread_func, key, latest_key, num_blocks);
ASSERT_TIMELY (200s, node1->ledger.cache.block_count == num_blocks * 2 + 3);
thread1.join ();
thread2.join ();
for (auto const & node : system.nodes)
{
ASSERT_EQ (0, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::failed_send_telemetry_req));
ASSERT_EQ (0, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::request_within_protection_cache_zone));
ASSERT_EQ (0, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::unsolicited_telemetry_ack));
ASSERT_EQ (0, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::no_response_received));
}
}
// Similar to signature_checker.boundary_checks but more exhaustive. Can take up to 1 minute
TEST (signature_checker, mass_boundary_checks)
{