Merge pull request #4785 from pwojcikdev/traffic-info

Per traffic type stats
This commit is contained in:
Piotr Wójcik 2024-11-12 10:45:36 +01:00 committed by GitHub
commit d12bd4a61a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 180 additions and 124 deletions

View file

@ -42,6 +42,7 @@ add_executable(
processor_service.cpp processor_service.cpp
random.cpp random.cpp
random_pool.cpp random_pool.cpp
rate_limiting.cpp
rep_crawler.cpp rep_crawler.cpp
receivable.cpp receivable.cpp
peer_history.cpp peer_history.cpp

View file

@ -0,0 +1,113 @@
#include <nano/lib/rate_limiting.hpp>
#include <nano/lib/utility.hpp>
#include <gtest/gtest.h>
#include <fstream>
#include <future>
using namespace std::chrono_literals;
TEST (rate, basic)
{
nano::rate::token_bucket bucket (10, 10);
// Initial burst
ASSERT_TRUE (bucket.try_consume (10));
ASSERT_FALSE (bucket.try_consume (10));
// With a fill rate of 10 tokens/sec, await 1/3 sec and get 3 tokens
std::this_thread::sleep_for (300ms);
ASSERT_TRUE (bucket.try_consume (3));
ASSERT_FALSE (bucket.try_consume (10));
// Allow time for the bucket to completely refill and do a full burst
std::this_thread::sleep_for (1s);
ASSERT_TRUE (bucket.try_consume (10));
ASSERT_EQ (bucket.largest_burst (), 10);
}
TEST (rate, network)
{
// For the purpose of the test, one token represents 1MB instead of one byte.
// Allow for 10 mb/s bursts (max bucket size), 5 mb/s long term rate
nano::rate::token_bucket bucket (10, 5);
// Initial burst of 10 mb/s over two calls
ASSERT_TRUE (bucket.try_consume (5));
ASSERT_EQ (bucket.largest_burst (), 5);
ASSERT_TRUE (bucket.try_consume (5));
ASSERT_EQ (bucket.largest_burst (), 10);
ASSERT_FALSE (bucket.try_consume (5));
// After 200 ms, the 5 mb/s fillrate means we have 1 mb available
std::this_thread::sleep_for (200ms);
ASSERT_TRUE (bucket.try_consume (1));
ASSERT_FALSE (bucket.try_consume (1));
}
TEST (rate, reset)
{
nano::rate::token_bucket bucket (0, 0);
// consume lots of tokens, buckets should be unlimited
ASSERT_TRUE (bucket.try_consume (1000000));
ASSERT_TRUE (bucket.try_consume (1000000));
// set bucket to be limited
bucket.reset (1000, 1000);
ASSERT_FALSE (bucket.try_consume (1001));
ASSERT_TRUE (bucket.try_consume (1000));
ASSERT_FALSE (bucket.try_consume (1000));
std::this_thread::sleep_for (2ms);
ASSERT_TRUE (bucket.try_consume (2));
// reduce the limit
bucket.reset (100, 100 * 1000);
ASSERT_FALSE (bucket.try_consume (101));
ASSERT_TRUE (bucket.try_consume (100));
std::this_thread::sleep_for (1ms);
ASSERT_TRUE (bucket.try_consume (100));
// increase the limit
bucket.reset (2000, 1);
ASSERT_FALSE (bucket.try_consume (2001));
ASSERT_TRUE (bucket.try_consume (2000));
// back to unlimited
bucket.reset (0, 0);
ASSERT_TRUE (bucket.try_consume (1000000));
ASSERT_TRUE (bucket.try_consume (1000000));
}
TEST (rate, unlimited)
{
nano::rate::token_bucket bucket (0, 0);
ASSERT_TRUE (bucket.try_consume (5));
ASSERT_EQ (bucket.largest_burst (), 5);
ASSERT_TRUE (bucket.try_consume (static_cast<size_t> (1e9)));
ASSERT_EQ (bucket.largest_burst (), static_cast<size_t> (1e9));
// With unlimited tokens, consuming always succeed
ASSERT_TRUE (bucket.try_consume (static_cast<size_t> (1e9)));
ASSERT_EQ (bucket.largest_burst (), static_cast<size_t> (1e9));
}
TEST (rate, busy_spin)
{
// Bucket should refill at a rate of 1 token per second
nano::rate::token_bucket bucket (1, 1);
// Run a very tight loop for 5 seconds + a bit of wiggle room
int counter = 0;
for (auto start = std::chrono::steady_clock::now (), now = start; now < start + 5500ms; now = std::chrono::steady_clock::now ())
{
if (bucket.try_consume ())
{
++counter;
}
}
// Bucket starts fully refilled, therefore we see 1 additional request
ASSERT_EQ (counter, 6);
}

View file

@ -15,110 +15,6 @@
using namespace std::chrono_literals; using namespace std::chrono_literals;
TEST (rate, basic)
{
nano::rate::token_bucket bucket (10, 10);
// Initial burst
ASSERT_TRUE (bucket.try_consume (10));
ASSERT_FALSE (bucket.try_consume (10));
// With a fill rate of 10 tokens/sec, await 1/3 sec and get 3 tokens
std::this_thread::sleep_for (300ms);
ASSERT_TRUE (bucket.try_consume (3));
ASSERT_FALSE (bucket.try_consume (10));
// Allow time for the bucket to completely refill and do a full burst
std::this_thread::sleep_for (1s);
ASSERT_TRUE (bucket.try_consume (10));
ASSERT_EQ (bucket.largest_burst (), 10);
}
TEST (rate, network)
{
// For the purpose of the test, one token represents 1MB instead of one byte.
// Allow for 10 mb/s bursts (max bucket size), 5 mb/s long term rate
nano::rate::token_bucket bucket (10, 5);
// Initial burst of 10 mb/s over two calls
ASSERT_TRUE (bucket.try_consume (5));
ASSERT_EQ (bucket.largest_burst (), 5);
ASSERT_TRUE (bucket.try_consume (5));
ASSERT_EQ (bucket.largest_burst (), 10);
ASSERT_FALSE (bucket.try_consume (5));
// After 200 ms, the 5 mb/s fillrate means we have 1 mb available
std::this_thread::sleep_for (200ms);
ASSERT_TRUE (bucket.try_consume (1));
ASSERT_FALSE (bucket.try_consume (1));
}
TEST (rate, reset)
{
nano::rate::token_bucket bucket (0, 0);
// consume lots of tokens, buckets should be unlimited
ASSERT_TRUE (bucket.try_consume (1000000));
ASSERT_TRUE (bucket.try_consume (1000000));
// set bucket to be limited
bucket.reset (1000, 1000);
ASSERT_FALSE (bucket.try_consume (1001));
ASSERT_TRUE (bucket.try_consume (1000));
ASSERT_FALSE (bucket.try_consume (1000));
std::this_thread::sleep_for (2ms);
ASSERT_TRUE (bucket.try_consume (2));
// reduce the limit
bucket.reset (100, 100 * 1000);
ASSERT_FALSE (bucket.try_consume (101));
ASSERT_TRUE (bucket.try_consume (100));
std::this_thread::sleep_for (1ms);
ASSERT_TRUE (bucket.try_consume (100));
// increase the limit
bucket.reset (2000, 1);
ASSERT_FALSE (bucket.try_consume (2001));
ASSERT_TRUE (bucket.try_consume (2000));
// back to unlimited
bucket.reset (0, 0);
ASSERT_TRUE (bucket.try_consume (1000000));
ASSERT_TRUE (bucket.try_consume (1000000));
}
TEST (rate, unlimited)
{
nano::rate::token_bucket bucket (0, 0);
ASSERT_TRUE (bucket.try_consume (5));
ASSERT_EQ (bucket.largest_burst (), 5);
ASSERT_TRUE (bucket.try_consume (static_cast<size_t> (1e9)));
ASSERT_EQ (bucket.largest_burst (), static_cast<size_t> (1e9));
// With unlimited tokens, consuming always succeed
ASSERT_TRUE (bucket.try_consume (static_cast<size_t> (1e9)));
ASSERT_EQ (bucket.largest_burst (), static_cast<size_t> (1e9));
}
TEST (rate, busy_spin)
{
// Bucket should refill at a rate of 1 token per second
nano::rate::token_bucket bucket (1, 1);
// Run a very tight loop for 5 seconds + a bit of wiggle room
int counter = 0;
for (auto start = std::chrono::steady_clock::now (), now = start; now < start + std::chrono::milliseconds{ 5500 }; now = std::chrono::steady_clock::now ())
{
if (bucket.try_consume ())
{
++counter;
}
}
// Bucket starts fully refilled, therefore we see 1 additional request
ASSERT_EQ (counter, 6);
}
TEST (optional_ptr, basic) TEST (optional_ptr, basic)
{ {
struct valtype struct valtype

View file

@ -45,11 +45,6 @@ void nano::rate::token_bucket::refill ()
} }
} }
std::size_t nano::rate::token_bucket::largest_burst () const
{
return max_token_count - smallest_size;
}
void nano::rate::token_bucket::reset (std::size_t max_token_count_a, std::size_t refill_rate_a) void nano::rate::token_bucket::reset (std::size_t max_token_count_a, std::size_t refill_rate_a)
{ {
// A token count of 0 indicates unlimited capacity. We use 1e9 as // A token count of 0 indicates unlimited capacity. We use 1e9 as
@ -63,6 +58,16 @@ void nano::rate::token_bucket::reset (std::size_t max_token_count_a, std::size_t
last_refill = std::chrono::steady_clock::now (); last_refill = std::chrono::steady_clock::now ();
} }
std::size_t nano::rate::token_bucket::largest_burst () const
{
return max_token_count - smallest_size;
}
std::size_t nano::rate::token_bucket::size () const
{
return current_size;
}
/* /*
* rate_limiter * rate_limiter
*/ */
@ -82,4 +87,10 @@ void nano::rate_limiter::reset (std::size_t limit_a, double burst_ratio_a)
{ {
nano::lock_guard<nano::mutex> guard{ mutex }; nano::lock_guard<nano::mutex> guard{ mutex };
bucket.reset (static_cast<std::size_t> (limit_a * burst_ratio_a), limit_a); bucket.reset (static_cast<std::size_t> (limit_a * burst_ratio_a), limit_a);
}
std::size_t nano::rate_limiter::size () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return bucket.size ();
} }

View file

@ -38,12 +38,13 @@ public:
*/ */
bool try_consume (unsigned tokens_required = 1); bool try_consume (unsigned tokens_required = 1);
/** Returns the largest burst observed */
std::size_t largest_burst () const;
/** Update the max_token_count and/or refill_rate_a parameters */ /** Update the max_token_count and/or refill_rate_a parameters */
void reset (std::size_t max_token_count, std::size_t refill_rate); void reset (std::size_t max_token_count, std::size_t refill_rate);
/** Returns the largest burst observed */
std::size_t largest_burst () const;
std::size_t size () const;
private: private:
void refill (); void refill ();
@ -71,6 +72,8 @@ public:
bool should_pass (std::size_t buffer_size); bool should_pass (std::size_t buffer_size);
void reset (std::size_t limit, double burst_ratio = 1.0); void reset (std::size_t limit, double burst_ratio = 1.0);
std::size_t size () const;
private: private:
nano::rate::token_bucket bucket; nano::rate::token_bucket bucket;
mutable nano::mutex mutex; mutable nano::mutex mutex;

View file

@ -13,14 +13,12 @@ enum class type
_invalid = 0, // Default value, should not be used _invalid = 0, // Default value, should not be used
test, test,
traffic_tcp,
error, error,
message, message,
block, block,
ledger, ledger,
rollback, rollback,
network, network,
tcp_server,
vote, vote,
vote_processor, vote_processor,
vote_processor_tier, vote_processor_tier,
@ -31,11 +29,14 @@ enum class type
http_callback, http_callback,
ipc, ipc,
tcp, tcp,
tcp_server,
tcp_channels, tcp_channels,
tcp_channels_rejected, tcp_channels_rejected,
tcp_channels_purge, tcp_channels_purge,
tcp_listener, tcp_listener,
tcp_listener_rejected, tcp_listener_rejected,
traffic_tcp,
traffic_tcp_type,
channel, channel,
socket, socket,
confirmation_height, confirmation_height,
@ -294,6 +295,9 @@ enum class detail
reachout_live, reachout_live,
reachout_cached, reachout_cached,
// traffic
generic,
// tcp // tcp
tcp_write_drop, tcp_write_drop,
tcp_write_no_socket_drop, tcp_write_no_socket_drop,

View file

@ -166,6 +166,8 @@ add_library(
transport/tcp_server.cpp transport/tcp_server.cpp
transport/tcp_socket.hpp transport/tcp_socket.hpp
transport/tcp_socket.cpp transport/tcp_socket.cpp
transport/traffic_type.hpp
transport/traffic_type.cpp
transport/transport.hpp transport/transport.hpp
transport/transport.cpp transport/transport.cpp
unchecked_map.cpp unchecked_map.cpp

View file

@ -41,6 +41,14 @@ void nano::bandwidth_limiter::reset (std::size_t limit, double burst_ratio, nano
limiter.reset (limit, burst_ratio); limiter.reset (limit, burst_ratio);
} }
nano::container_info nano::bandwidth_limiter::container_info () const
{
nano::container_info info;
info.put ("generic", limiter_generic.size ());
info.put ("bootstrap", limiter_bootstrap.size ());
return info;
}
/* /*
* bandwidth_limiter_config * bandwidth_limiter_config
*/ */

View file

@ -37,6 +37,8 @@ public:
*/ */
void reset (std::size_t limit, double burst_ratio, nano::transport::traffic_type type = nano::transport::traffic_type::generic); void reset (std::size_t limit, double burst_ratio, nano::transport::traffic_type type = nano::transport::traffic_type::generic);
nano::container_info container_info () const;
private: private:
/** /**
* Returns reference to limiter corresponding to the limit type * Returns reference to limiter corresponding to the limit type

View file

@ -1217,6 +1217,7 @@ nano::container_info nano::node::container_info () const
info.add ("local_block_broadcaster", local_block_broadcaster.container_info ()); info.add ("local_block_broadcaster", local_block_broadcaster.container_info ());
info.add ("rep_tiers", rep_tiers.container_info ()); info.add ("rep_tiers", rep_tiers.container_info ());
info.add ("message_processor", message_processor.container_info ()); info.add ("message_processor", message_processor.container_info ());
info.add ("bandwidth", outbound_limiter.container_info ());
return info; return info;
} }

View file

@ -186,17 +186,18 @@ void nano::transport::tcp_socket::write_queued_messages ()
return; return;
} }
auto next = send_queue.pop (); auto maybe_next = send_queue.pop ();
if (!next) if (!maybe_next)
{ {
return; return;
} }
auto const & [next, type] = *maybe_next;
set_default_timeout (); set_default_timeout ();
write_in_progress = true; write_in_progress = true;
nano::async_write (raw_socket, next->buffer, nano::async_write (raw_socket, next.buffer,
boost::asio::bind_executor (strand, [this_l = shared_from_this (), next /* `next` object keeps buffer in scope */] (boost::system::error_code ec, std::size_t size) { boost::asio::bind_executor (strand, [this_l = shared_from_this (), next /* `next` object keeps buffer in scope */, type] (boost::system::error_code ec, std::size_t size) {
debug_assert (this_l->strand.running_in_this_thread ()); debug_assert (this_l->strand.running_in_this_thread ());
auto node_l = this_l->node_w.lock (); auto node_l = this_l->node_w.lock ();
@ -214,12 +215,13 @@ void nano::transport::tcp_socket::write_queued_messages ()
else else
{ {
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::out, size, /* aggregate all */ true); node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::out, size, /* aggregate all */ true);
node_l->stats.add (nano::stat::type::traffic_tcp_type, to_stat_detail (type), nano::stat::dir::out, size);
this_l->set_last_completion (); this_l->set_last_completion ();
} }
if (next->callback) if (next.callback)
{ {
next->callback (ec, size); next.callback (ec, size);
} }
if (!ec) if (!ec)
@ -436,17 +438,17 @@ bool nano::transport::socket_queue::insert (const buffer_t & buffer, callback_t
return false; // Not queued return false; // Not queued
} }
std::optional<nano::transport::socket_queue::entry> nano::transport::socket_queue::pop () auto nano::transport::socket_queue::pop () -> std::optional<result_t>
{ {
nano::lock_guard<nano::mutex> guard{ mutex }; nano::lock_guard<nano::mutex> guard{ mutex };
auto try_pop = [this] (nano::transport::traffic_type type) -> std::optional<entry> { auto try_pop = [this] (nano::transport::traffic_type type) -> std::optional<result_t> {
auto & que = queues[type]; auto & que = queues[type];
if (!que.empty ()) if (!que.empty ())
{ {
auto item = que.front (); auto item = que.front ();
que.pop (); que.pop ();
return item; return std::make_pair (item, type);
} }
return std::nullopt; return std::nullopt;
}; };

View file

@ -42,10 +42,12 @@ public:
}; };
public: public:
using result_t = std::pair<entry, nano::transport::traffic_type>;
explicit socket_queue (std::size_t max_size); explicit socket_queue (std::size_t max_size);
bool insert (buffer_t const &, callback_t, nano::transport::traffic_type); bool insert (buffer_t const &, callback_t, nano::transport::traffic_type);
std::optional<entry> pop (); std::optional<result_t> pop ();
void clear (); void clear ();
std::size_t size (nano::transport::traffic_type) const; std::size_t size (nano::transport::traffic_type) const;
bool empty () const; bool empty () const;

View file

@ -0,0 +1,7 @@
#include <nano/lib/enum_util.hpp>
#include <nano/node/transport/traffic_type.hpp>
nano::stat::detail nano::transport::to_stat_detail (nano::transport::traffic_type type)
{
return nano::enum_util::cast<nano::stat::detail> (type);
}

View file

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <nano/lib/stats.hpp>
namespace nano::transport namespace nano::transport
{ {
/** /**
@ -10,4 +12,6 @@ enum class traffic_type
generic, generic,
bootstrap, // Ascending bootstrap (asc_pull_ack, asc_pull_req) traffic bootstrap, // Ascending bootstrap (asc_pull_ack, asc_pull_req) traffic
}; };
nano::stat::detail to_stat_detail (traffic_type);
} }