Global bandwidth limiter (#2454)

* Global bandwidth limiter

Changes from per-channel limits to a global limiter as originally intended, and fixes a few issues with the limiter

- Fix dropping condition not checking trended_rate
- Fix trended_rate calculation resulting in a 20x lower value
- Normalize data to the sampling period
- Update and simplify validation test
- Add new test using channels directly and ensuring the limit is global

* Increment next_trend rather than normalizing the data

* Fix tests

* Small optimization on the ring buffer

* Making the second test slightly easier for CI

* Tests fix

* Higher runtime rate

* Make sure non-droppable packets still increase the rate. Split into add() and should_filter()

* Disable some test assertions for OSX due to slow CI

* Revamp tests to not yield the threads, simpler and should pass CI

* Remove trivial condition check to test with less messages
This commit is contained in:
Guilherme Lawless 2020-01-01 15:14:41 +00:00 committed by GitHub
commit 0db5d39f99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 142 additions and 75 deletions

View file

@ -871,56 +871,47 @@ TEST (network, replace_port)
node1->stop ();
}
// The test must be completed in less than 1 second
TEST (bandwidth_limiter, validate)
{
size_t const full_confirm_ack (488 + 8);
nano::system system;
size_t const message_size (1024);
nano::bandwidth_limiter limiter_0 (0);
auto message_limit = 3;
nano::bandwidth_limiter limiter_3 (message_size * message_limit);
ASSERT_FALSE (limiter_0.should_drop (message_size)); // never drops
auto start (std::chrono::steady_clock::now ());
for (unsigned i = 0; i < message_limit; ++i)
{
nano::bandwidth_limiter limiter_0 (0);
nano::bandwidth_limiter limiter_1 (1024);
nano::bandwidth_limiter limiter_256 (1024 * 256);
nano::bandwidth_limiter limiter_1024 (1024 * 1024);
nano::bandwidth_limiter limiter_1536 (1024 * 1536);
auto now (std::chrono::steady_clock::now ());
while (now + 1s >= std::chrono::steady_clock::now ())
{
ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop
ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size ()
limiter_256.should_drop (full_confirm_ack);
limiter_1024.should_drop (full_confirm_ack);
limiter_1536.should_drop (full_confirm_ack);
std::this_thread::sleep_for (10ms);
}
ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop
ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size ()
ASSERT_FALSE (limiter_256.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped
ASSERT_FALSE (limiter_1024.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped
ASSERT_FALSE (limiter_1536.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped
limiter_3.add (message_size);
ASSERT_FALSE (limiter_3.should_drop (message_size));
}
system.deadline_set (300ms);
// Wait for the trended rate to catch up
while (limiter_3.get_rate () < limiter_3.get_limit ())
{
nano::bandwidth_limiter limiter_0 (0);
nano::bandwidth_limiter limiter_1 (1024);
nano::bandwidth_limiter limiter_256 (1024 * 256);
nano::bandwidth_limiter limiter_1024 (1024 * 1024);
nano::bandwidth_limiter limiter_1536 (1024 * 1536);
auto now (std::chrono::steady_clock::now ());
//trend rate for 5 sec
while (now + 5s >= std::chrono::steady_clock::now ())
{
ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop
ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size ()
limiter_256.should_drop (full_confirm_ack);
limiter_1024.should_drop (full_confirm_ack);
limiter_1536.should_drop (full_confirm_ack);
std::this_thread::sleep_for (50ms);
}
ASSERT_EQ (limiter_0.get_rate (), 0); //should be 0 as rate is not gathered if not needed
ASSERT_EQ (limiter_1.get_rate (), 0); //should be 0 since nothing is small enough to pass through is tracked
ASSERT_EQ (limiter_256.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked
ASSERT_EQ (limiter_1024.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked
ASSERT_EQ (limiter_1536.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked
// Force an update
limiter_3.add (0);
ASSERT_NO_ERROR (system.poll (10ms));
}
ASSERT_EQ (limiter_3.get_rate (), limiter_3.get_limit ());
ASSERT_LT (std::chrono::steady_clock::now () - 1s, start);
// A new message would drop
ASSERT_TRUE (limiter_3.should_drop (message_size));
// So adding it will not increase the rate
limiter_3.add (message_size);
ASSERT_EQ (limiter_3.get_rate (), limiter_3.get_limit ());
// Unless the message is forced (e.g. non-droppable packets)
limiter_3.add (message_size, true);
// Limiter says it should drop, but the rate will have increased
// Wait for the trended rate to catch up
while (limiter_3.get_rate () < limiter_3.get_limit () + message_size)
{
// Force an update
limiter_3.add (0);
ASSERT_NO_ERROR (system.poll (10ms));
}
ASSERT_TRUE (limiter_3.should_drop (message_size));
ASSERT_EQ (limiter_3.get_rate (), limiter_3.get_limit () + message_size);
ASSERT_LT (std::chrono::steady_clock::now () - 1s, start);
}

View file

@ -3176,6 +3176,55 @@ TEST (node, bidirectional_tcp)
}
}
// The test must be completed in less than 1 second
TEST (node, bandwidth_limiter)
{
nano::system system;
nano::genesis genesis;
nano::publish message (genesis.open);
auto message_size = message.to_bytes ()->size();
auto message_limit = 4; // must be multiple of the number of channels
nano::node_config node_config (24000, system.logging);
node_config.bandwidth_limit = message_limit * message_size;
auto & node = *system.add_node (node_config);
auto channel1 (node.network.udp_channels.create (node.network.endpoint ()));
auto channel2 (node.network.udp_channels.create (node.network.endpoint ()));
auto start (std::chrono::steady_clock::now ());
for (unsigned i=0; i < message_limit; i+=2) // number of channels
{
channel1->send (message);
channel2->send (message);
}
ASSERT_LT (std::chrono::steady_clock::now () - 1s, start);
system.deadline_set (300ms);
// Wait for the trended rate to catch up
while (node.network.limiter.get_rate () < node.network.limiter.get_limit ())
{
// Force an update
node.network.limiter.add (0);
ASSERT_NO_ERROR (system.poll (10ms));
}
ASSERT_EQ (0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
ASSERT_LT (std::chrono::steady_clock::now () - 1s, start);
// Should be dropped and not increase the rate
channel1->send (message);
ASSERT_EQ (1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
ASSERT_EQ (node.network.limiter.get_rate (), node.network.limiter.get_limit ());
// Non-droppable, increases the rate
channel2->send (message, nullptr, false);
ASSERT_EQ (1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out));
system.deadline_set (300ms);
// Wait for the trended rate to catch up
while (node.network.limiter.get_rate () < node.network.limiter.get_limit () + message_size)
{
// Force an update
node.network.limiter.add (0);
ASSERT_NO_ERROR (system.poll (10ms));
}
ASSERT_EQ (node.network.limiter.get_rate (), node.network.limiter.get_limit () + message_size);
ASSERT_LT (std::chrono::steady_clock::now () - 1s, start);
}
TEST (active_difficulty, recalculate_work)
{
nano::system system;

View file

@ -1,13 +1,13 @@
#include <nano/lib/utility.hpp>
#include <cassert>
#include <cstring>
#include <fcntl.h>
#include <link.h>
#include <sys/stat.h>
#include <unistd.h>
#include <cassert>
#include <cstring>
namespace
{
// This creates a file for the load address of an executable or shared library.

View file

@ -12,6 +12,7 @@
nano::network::network (nano::node & node_a, uint16_t port_a) :
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
resolver (node_a.io_ctx),
limiter (node_a.config.bandwidth_limit),
node (node_a),
udp_channels (node_a, port_a),
tcp_channels (node_a),

View file

@ -155,6 +155,7 @@ public:
nano::message_buffer_manager buffer_container;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::bandwidth_limiter limiter;
nano::node & node;
nano::transport::udp_channels udp_channels;
nano::transport::tcp_channels tcp_channels;

View file

@ -9,6 +9,7 @@
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>
#include <unordered_set>
namespace mi = boost::multi_index;

View file

@ -72,7 +72,6 @@ nano::tcp_endpoint nano::transport::map_endpoint_to_tcp (nano::endpoint const &
}
nano::transport::channel::channel (nano::node & node_a) :
limiter (node_a.config.bandwidth_limit),
node (node_a)
{
set_network_version (node_a.network_params.protocol.protocol_version);
@ -84,7 +83,8 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct
message_a.visit (visitor);
auto buffer (message_a.to_shared_const_buffer ());
auto detail (visitor.result);
if (!is_droppable_a || !limiter.should_drop (buffer.size ()))
node.network.limiter.add (buffer.size (), !is_droppable_a);
if (!is_droppable_a || !node.network.limiter.should_drop (buffer.size ()))
{
send_buffer (buffer, detail, callback_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
@ -211,37 +211,50 @@ using namespace std::chrono_literals;
nano::bandwidth_limiter::bandwidth_limiter (const size_t limit_a) :
next_trend (std::chrono::steady_clock::now () + 50ms),
limit (limit_a),
rate (0),
trended_rate (0)
limit (limit_a)
{
}
bool nano::bandwidth_limiter::should_drop (const size_t & message_size)
void nano::bandwidth_limiter::add (const size_t & message_size_a, bool force_a)
{
bool result (false);
if (limit == 0) //never drop if limit is 0
if (limit == 0)
{
return result;
return;
}
nano::lock_guard<std::mutex> lock (mutex);
if (message_size > limit / rate_buffer.size () || rate + message_size > limit)
auto now = std::chrono::steady_clock::now ();
if (next_trend < now)
{
result = true;
// Reset if too much time has passed
if (now - next_trend > period)
{
next_trend = now;
rate_buffer.clear ();
}
rate_buffer.push_back (rate);
rate = 0;
trended_rate = std::accumulate (rate_buffer.begin (), rate_buffer.end (), size_t{ 0 });
// Increment rather than setting to now + period, to account for fluctuations in sampling
next_trend += period;
}
// Unless forced, only add to the current rate if it will not go beyond the trended limit
if (force_a || !should_drop (message_size_a))
{
rate += message_size_a;
}
}
bool nano::bandwidth_limiter::should_drop (const size_t & message_size_a)
{
// Never drop if limit is 0
if (limit == 0)
{
return false;
}
else
{
rate = rate + message_size;
return (trended_rate + message_size_a > limit);
}
if (next_trend < std::chrono::steady_clock::now ())
{
next_trend = std::chrono::steady_clock::now () + 50ms;
rate_buffer.push_back (rate);
trended_rate = std::accumulate (rate_buffer.begin (), rate_buffer.end (), size_t{ 0 }) / rate_buffer.size ();
rate = 0;
}
return result;
}
size_t nano::bandwidth_limiter::get_rate ()
@ -249,3 +262,8 @@ size_t nano::bandwidth_limiter::get_rate ()
nano::lock_guard<std::mutex> lock (mutex);
return trended_rate;
}
size_t nano::bandwidth_limiter::get_limit () const
{
return limit;
}

View file

@ -10,24 +10,31 @@ namespace nano
class bandwidth_limiter final
{
public:
// initialize with rate 0 = unbounded
// initialize with limit 0 = unbounded
bandwidth_limiter (const size_t);
// force_a should be set for non-droppable packets
void add (const size_t &, bool const force_a = false);
bool should_drop (const size_t &);
size_t get_rate ();
size_t get_limit () const;
std::chrono::milliseconds const period{ 50 };
static constexpr unsigned buffer_size{ 20 };
private:
//last time rate was adjusted
std::chrono::steady_clock::time_point next_trend;
//trend rate over 20 poll periods
boost::circular_buffer<size_t> rate_buffer{ 20, 0 };
boost::circular_buffer<size_t> rate_buffer{ buffer_size };
//limit bandwidth to
const size_t limit;
//rate, increment if message_size + rate < rate
size_t rate;
size_t rate{ 0 };
//trended rate to even out spikes in traffic
size_t trended_rate;
std::atomic<size_t> trended_rate{ 0 };
std::mutex mutex;
};
namespace transport
{
class message;
@ -132,7 +139,6 @@ namespace transport
}
mutable std::mutex channel_mutex;
nano::bandwidth_limiter limiter;
private:
std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () };