diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index eddab44f..1003fbb9 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -871,56 +871,47 @@ TEST (network, replace_port) node1->stop (); } +// The test must be completed in less than 1 second TEST (bandwidth_limiter, validate) { - size_t const full_confirm_ack (488 + 8); + nano::system system; + size_t const message_size (1024); + nano::bandwidth_limiter limiter_0 (0); + auto message_limit = 3; + nano::bandwidth_limiter limiter_3 (message_size * message_limit); + ASSERT_FALSE (limiter_0.should_drop (message_size)); // never drops + auto start (std::chrono::steady_clock::now ()); + for (unsigned i = 0; i < message_limit; ++i) { - nano::bandwidth_limiter limiter_0 (0); - nano::bandwidth_limiter limiter_1 (1024); - nano::bandwidth_limiter limiter_256 (1024 * 256); - nano::bandwidth_limiter limiter_1024 (1024 * 1024); - nano::bandwidth_limiter limiter_1536 (1024 * 1536); - - auto now (std::chrono::steady_clock::now ()); - - while (now + 1s >= std::chrono::steady_clock::now ()) - { - ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop - ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size () - limiter_256.should_drop (full_confirm_ack); - limiter_1024.should_drop (full_confirm_ack); - limiter_1536.should_drop (full_confirm_ack); - std::this_thread::sleep_for (10ms); - } - ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop - ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size () - ASSERT_FALSE (limiter_256.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped - ASSERT_FALSE (limiter_1024.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped - ASSERT_FALSE (limiter_1536.should_drop (full_confirm_ack)); // as a second has passed counter is started and nothing is dropped + limiter_3.add (message_size); + ASSERT_FALSE (limiter_3.should_drop (message_size)); } - + system.deadline_set (300ms); + // Wait for the trended rate to catch up + while (limiter_3.get_rate () < limiter_3.get_limit ()) { - nano::bandwidth_limiter limiter_0 (0); - nano::bandwidth_limiter limiter_1 (1024); - nano::bandwidth_limiter limiter_256 (1024 * 256); - nano::bandwidth_limiter limiter_1024 (1024 * 1024); - nano::bandwidth_limiter limiter_1536 (1024 * 1536); - - auto now (std::chrono::steady_clock::now ()); - //trend rate for 5 sec - while (now + 5s >= std::chrono::steady_clock::now ()) - { - ASSERT_FALSE (limiter_0.should_drop (full_confirm_ack)); // will never drop - ASSERT_TRUE (limiter_1.should_drop (full_confirm_ack)); // always drop as message > limit / rate_buffer.size () - limiter_256.should_drop (full_confirm_ack); - limiter_1024.should_drop (full_confirm_ack); - limiter_1536.should_drop (full_confirm_ack); - std::this_thread::sleep_for (50ms); - } - ASSERT_EQ (limiter_0.get_rate (), 0); //should be 0 as rate is not gathered if not needed - ASSERT_EQ (limiter_1.get_rate (), 0); //should be 0 since nothing is small enough to pass through is tracked - ASSERT_EQ (limiter_256.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked - ASSERT_EQ (limiter_1024.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked - ASSERT_EQ (limiter_1536.get_rate (), full_confirm_ack); //should be 0 since nothing is small enough to pass through is tracked + // Force an update + limiter_3.add (0); + ASSERT_NO_ERROR (system.poll (10ms)); } + ASSERT_EQ (limiter_3.get_rate (), limiter_3.get_limit ()); + ASSERT_LT (std::chrono::steady_clock::now () - 1s, start); + // A new message would drop + ASSERT_TRUE (limiter_3.should_drop (message_size)); + // So adding it will not increase the rate + limiter_3.add (message_size); + ASSERT_EQ (limiter_3.get_rate (), limiter_3.get_limit ()); + // Unless the message is forced (e.g. non-droppable packets) + limiter_3.add (message_size, true); + // Limiter says it should drop, but the rate will have increased + // Wait for the trended rate to catch up + while (limiter_3.get_rate () < limiter_3.get_limit () + message_size) + { + // Force an update + limiter_3.add (0); + ASSERT_NO_ERROR (system.poll (10ms)); + } + ASSERT_TRUE (limiter_3.should_drop (message_size)); + ASSERT_EQ (limiter_3.get_rate (), limiter_3.get_limit () + message_size); + ASSERT_LT (std::chrono::steady_clock::now () - 1s, start); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index d9120f7f..4f1842da 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3176,6 +3176,55 @@ TEST (node, bidirectional_tcp) } } +// The test must be completed in less than 1 second +TEST (node, bandwidth_limiter) +{ + nano::system system; + nano::genesis genesis; + nano::publish message (genesis.open); + auto message_size = message.to_bytes ()->size(); + auto message_limit = 4; // must be multiple of the number of channels + nano::node_config node_config (24000, system.logging); + node_config.bandwidth_limit = message_limit * message_size; + auto & node = *system.add_node (node_config); + auto channel1 (node.network.udp_channels.create (node.network.endpoint ())); + auto channel2 (node.network.udp_channels.create (node.network.endpoint ())); + auto start (std::chrono::steady_clock::now ()); + for (unsigned i=0; i < message_limit; i+=2) // number of channels + { + channel1->send (message); + channel2->send (message); + } + ASSERT_LT (std::chrono::steady_clock::now () - 1s, start); + system.deadline_set (300ms); + // Wait for the trended rate to catch up + while (node.network.limiter.get_rate () < node.network.limiter.get_limit ()) + { + // Force an update + node.network.limiter.add (0); + ASSERT_NO_ERROR (system.poll (10ms)); + } + ASSERT_EQ (0, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); + ASSERT_LT (std::chrono::steady_clock::now () - 1s, start); + // Should be dropped and not increase the rate + channel1->send (message); + ASSERT_EQ (1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); + ASSERT_EQ (node.network.limiter.get_rate (), node.network.limiter.get_limit ()); + // Non-droppable, increases the rate + channel2->send (message, nullptr, false); + ASSERT_EQ (1, node.stats.count (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::out)); + system.deadline_set (300ms); + // Wait for the trended rate to catch up + while (node.network.limiter.get_rate () < node.network.limiter.get_limit () + message_size) + { + // Force an update + node.network.limiter.add (0); + ASSERT_NO_ERROR (system.poll (10ms)); + } + ASSERT_EQ (node.network.limiter.get_rate (), node.network.limiter.get_limit () + message_size); + ASSERT_LT (std::chrono::steady_clock::now () - 1s, start); +} + TEST (active_difficulty, recalculate_work) { nano::system system; diff --git a/nano/lib/plat/linux/debugging.cpp b/nano/lib/plat/linux/debugging.cpp index cbfb5e27..1e9b4a9a 100644 --- a/nano/lib/plat/linux/debugging.cpp +++ b/nano/lib/plat/linux/debugging.cpp @@ -1,13 +1,13 @@ #include +#include +#include + #include #include #include #include -#include -#include - namespace { // This creates a file for the load address of an executable or shared library. diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 9fbf3985..4a5a1075 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -12,6 +12,7 @@ nano::network::network (nano::node & node_a, uint16_t port_a) : buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer resolver (node_a.io_ctx), +limiter (node_a.config.bandwidth_limit), node (node_a), udp_channels (node_a, port_a), tcp_channels (node_a), diff --git a/nano/node/network.hpp b/nano/node/network.hpp index c9d61335..2f1aa40e 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -155,6 +155,7 @@ public: nano::message_buffer_manager buffer_container; boost::asio::ip::udp::resolver resolver; std::vector packet_processing_threads; + nano::bandwidth_limiter limiter; nano::node & node; nano::transport::udp_channels udp_channels; nano::transport::tcp_channels tcp_channels; diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index db5ec655..0826df20 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -9,6 +9,7 @@ #include #include #include + #include namespace mi = boost::multi_index; diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index b9a222aa..37f1731a 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -72,7 +72,6 @@ nano::tcp_endpoint nano::transport::map_endpoint_to_tcp (nano::endpoint const & } nano::transport::channel::channel (nano::node & node_a) : -limiter (node_a.config.bandwidth_limit), node (node_a) { set_network_version (node_a.network_params.protocol.protocol_version); @@ -84,7 +83,8 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct message_a.visit (visitor); auto buffer (message_a.to_shared_const_buffer ()); auto detail (visitor.result); - if (!is_droppable_a || !limiter.should_drop (buffer.size ())) + node.network.limiter.add (buffer.size (), !is_droppable_a); + if (!is_droppable_a || !node.network.limiter.should_drop (buffer.size ())) { send_buffer (buffer, detail, callback_a); node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out); @@ -211,37 +211,50 @@ using namespace std::chrono_literals; nano::bandwidth_limiter::bandwidth_limiter (const size_t limit_a) : next_trend (std::chrono::steady_clock::now () + 50ms), -limit (limit_a), -rate (0), -trended_rate (0) +limit (limit_a) { } -bool nano::bandwidth_limiter::should_drop (const size_t & message_size) +void nano::bandwidth_limiter::add (const size_t & message_size_a, bool force_a) { - bool result (false); - if (limit == 0) //never drop if limit is 0 + if (limit == 0) { - return result; + return; } nano::lock_guard lock (mutex); - - if (message_size > limit / rate_buffer.size () || rate + message_size > limit) + auto now = std::chrono::steady_clock::now (); + if (next_trend < now) { - result = true; + // Reset if too much time has passed + if (now - next_trend > period) + { + next_trend = now; + rate_buffer.clear (); + } + rate_buffer.push_back (rate); + rate = 0; + trended_rate = std::accumulate (rate_buffer.begin (), rate_buffer.end (), size_t{ 0 }); + // Increment rather than setting to now + period, to account for fluctuations in sampling + next_trend += period; + } + // Unless forced, only add to the current rate if it will not go beyond the trended limit + if (force_a || !should_drop (message_size_a)) + { + rate += message_size_a; + } +} + +bool nano::bandwidth_limiter::should_drop (const size_t & message_size_a) +{ + // Never drop if limit is 0 + if (limit == 0) + { + return false; } else { - rate = rate + message_size; + return (trended_rate + message_size_a > limit); } - if (next_trend < std::chrono::steady_clock::now ()) - { - next_trend = std::chrono::steady_clock::now () + 50ms; - rate_buffer.push_back (rate); - trended_rate = std::accumulate (rate_buffer.begin (), rate_buffer.end (), size_t{ 0 }) / rate_buffer.size (); - rate = 0; - } - return result; } size_t nano::bandwidth_limiter::get_rate () @@ -249,3 +262,8 @@ size_t nano::bandwidth_limiter::get_rate () nano::lock_guard lock (mutex); return trended_rate; } + +size_t nano::bandwidth_limiter::get_limit () const +{ + return limit; +} diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index ae200aed..7c3dd751 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -10,24 +10,31 @@ namespace nano class bandwidth_limiter final { public: - // initialize with rate 0 = unbounded + // initialize with limit 0 = unbounded bandwidth_limiter (const size_t); + // force_a should be set for non-droppable packets + void add (const size_t &, bool const force_a = false); bool should_drop (const size_t &); size_t get_rate (); + size_t get_limit () const; + + std::chrono::milliseconds const period{ 50 }; + static constexpr unsigned buffer_size{ 20 }; private: //last time rate was adjusted std::chrono::steady_clock::time_point next_trend; //trend rate over 20 poll periods - boost::circular_buffer rate_buffer{ 20, 0 }; + boost::circular_buffer rate_buffer{ buffer_size }; //limit bandwidth to const size_t limit; //rate, increment if message_size + rate < rate - size_t rate; + size_t rate{ 0 }; //trended rate to even out spikes in traffic - size_t trended_rate; + std::atomic trended_rate{ 0 }; std::mutex mutex; }; + namespace transport { class message; @@ -132,7 +139,6 @@ namespace transport } mutable std::mutex channel_mutex; - nano::bandwidth_limiter limiter; private: std::chrono::steady_clock::time_point last_bootstrap_attempt{ std::chrono::steady_clock::time_point () };