Convert node-wide request rate limiter to a per-channel outstanding request limiter. (#4211)

Adding prioritizing for selecting channels, picking most responsive channel first.
This commit is contained in:
clemahieu 2023-04-17 11:41:26 +01:00 committed by GitHub
commit d3fcc4dcc6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 249 additions and 41 deletions

View file

@ -61,6 +61,8 @@ add_library(
bootstrap_ascending/account_sets.cpp
bootstrap_ascending/iterators.hpp
bootstrap_ascending/iterators.cpp
bootstrap_ascending/peer_scoring.hpp
bootstrap_ascending/peer_scoring.cpp
bootstrap_ascending/service.hpp
bootstrap_ascending/service.cpp
cli.hpp

View file

@ -26,7 +26,8 @@ public:
nano::error deserialize (nano::tomlconfig & toml);
nano::error serialize (nano::tomlconfig & toml) const;
std::size_t requests_limit{ 1024 };
// Maximum number of un-responded requests per channel
std::size_t requests_limit{ 4 };
std::size_t database_requests_limit{ 1024 };
std::size_t pull_count{ nano::bootstrap_server::max_blocks };
nano::millis_t timeout{ 1000 * 3 };

View file

@ -0,0 +1,135 @@
#include <nano/node/bootstrap/bootstrap_config.hpp>
#include <nano/node/bootstrap_ascending/peer_scoring.hpp>
#include <nano/node/transport/channel.hpp>
/*
* peer_scoring
*/
nano::bootstrap_ascending::peer_scoring::peer_scoring (nano::bootstrap_ascending_config & config, nano::network_constants const & network_constants) :
network_constants{ network_constants },
config{ config }
{
}
bool nano::bootstrap_ascending::peer_scoring::try_send_message (std::shared_ptr<nano::transport::channel> channel)
{
auto & index = scoring.get<tag_channel> ();
auto existing = index.find (channel.get ());
if (existing == index.end ())
{
index.emplace (channel, 1, 1, 0);
}
else
{
if (existing->outstanding < config.requests_limit)
{
[[maybe_unused]] auto success = index.modify (existing, [] (auto & score) {
++score.outstanding;
++score.request_count_total;
});
debug_assert (success);
}
else
{
return true;
}
}
return false;
}
void nano::bootstrap_ascending::peer_scoring::received_message (std::shared_ptr<nano::transport::channel> channel)
{
auto & index = scoring.get<tag_channel> ();
auto existing = index.find (channel.get ());
if (existing != index.end ())
{
if (existing->outstanding > 1)
{
[[maybe_unused]] auto success = index.modify (existing, [] (auto & score) {
--score.outstanding;
++score.response_count_total;
});
debug_assert (success);
}
}
}
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::peer_scoring::channel ()
{
auto & index = scoring.get<tag_outstanding> ();
for (auto const & score : index)
{
if (auto channel = score.shared ())
{
if (!channel->max ())
{
if (!try_send_message (channel))
{
return channel;
}
}
}
}
return nullptr;
}
std::size_t nano::bootstrap_ascending::peer_scoring::size () const
{
return scoring.size ();
}
void nano::bootstrap_ascending::peer_scoring::timeout ()
{
auto & index = scoring.get<tag_channel> ();
for (auto score = index.begin (), n = index.end (); score != n;)
{
if (auto channel = score->shared ())
{
if (channel->alive ())
{
++score;
continue;
}
}
score = index.erase (score);
}
for (auto score = scoring.begin (), n = scoring.end (); score != n; ++score)
{
scoring.modify (score, [] (auto & score_a) {
score_a.decay ();
});
}
}
void nano::bootstrap_ascending::peer_scoring::sync (std::deque<std::shared_ptr<nano::transport::channel>> const & list)
{
auto & index = scoring.get<tag_channel> ();
for (auto const & channel : list)
{
if (channel->get_network_version () >= network_constants.bootstrap_protocol_version_min)
{
if (index.find (channel.get ()) == index.end ())
{
if (!channel->max (nano::transport::traffic_type::bootstrap))
{
index.emplace (channel, 1, 1, 0);
}
}
}
}
}
/*
* peer_score
*/
nano::bootstrap_ascending::peer_scoring::peer_score::peer_score (
std::shared_ptr<nano::transport::channel> const & channel_a, uint64_t outstanding_a, uint64_t request_count_total_a, uint64_t response_count_total_a) :
channel{ channel_a },
channel_ptr{ channel_a.get () },
outstanding{ outstanding_a },
request_count_total{ request_count_total_a },
response_count_total{ response_count_total_a }
{
}

View file

@ -0,0 +1,87 @@
#pragma once
#include <nano/node/common.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index_container.hpp>
#include <deque>
#include <memory>
namespace mi = boost::multi_index;
namespace nano
{
class bootstrap_ascending_config;
class network_constants;
namespace transport
{
class channel;
}
namespace bootstrap_ascending
{
// Container for tracking and scoring peers with respect to bootstrapping
class peer_scoring
{
public:
peer_scoring (nano::bootstrap_ascending_config & config, nano::network_constants const & network_constants);
// Returns true if channel limit has been exceeded
bool try_send_message (std::shared_ptr<nano::transport::channel> channel);
void received_message (std::shared_ptr<nano::transport::channel> channel);
std::shared_ptr<nano::transport::channel> channel ();
[[nodiscard]] std::size_t size () const;
// Cleans up scores for closed channels
// Decays scores which become inaccurate over time due to message drops
void timeout ();
void sync (std::deque<std::shared_ptr<nano::transport::channel>> const & list);
private:
class peer_score
{
public:
explicit peer_score (std::shared_ptr<nano::transport::channel> const &, uint64_t, uint64_t, uint64_t);
std::weak_ptr<nano::transport::channel> channel;
// std::weak_ptr does not provide ordering so the naked pointer is also tracked and used for ordering channels
// This pointer may be invalid if the channel has been destroyed
nano::transport::channel * channel_ptr;
// Acquire reference to the shared channel object if it is still valid
[[nodiscard]] std::shared_ptr<nano::transport::channel> shared () const
{
auto result = channel.lock ();
if (result)
{
debug_assert (result.get () == channel_ptr);
}
return result;
}
void decay ()
{
outstanding = outstanding > 0 ? outstanding - 1 : 0;
}
// Number of outstanding requests to a peer
uint64_t outstanding{ 0 };
uint64_t request_count_total{ 0 };
uint64_t response_count_total{ 0 };
};
nano::network_constants const & network_constants;
nano::bootstrap_ascending_config & config;
// clang-format off
// Indexes scores by their shared channel pointer
class tag_channel {};
// Indexes scores by the number of outstanding requests in ascending order
class tag_outstanding {};
using scoring_t = boost::multi_index_container<peer_score,
mi::indexed_by<
mi::hashed_unique<mi::tag<tag_channel>,
mi::member<peer_score, nano::transport::channel *, &peer_score::channel_ptr>>,
mi::ordered_non_unique<mi::tag<tag_outstanding>,
mi::member<peer_score, uint64_t, &peer_score::outstanding>>>>;
// clang-format on
scoring_t scoring;
};
}
}

View file

@ -24,7 +24,7 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano:
accounts{ stats },
iterator{ ledger.store },
throttle{ config.bootstrap_ascending.throttle_count },
limiter{ config.bootstrap_ascending.requests_limit, 1.0 },
scoring{ config.bootstrap_ascending, config.network_params.network },
database_limiter{ config.bootstrap_ascending.database_requests_limit, 1.0 }
{
// TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread
@ -114,6 +114,12 @@ size_t nano::bootstrap_ascending::service::blocked_size () const
return accounts.blocked_size ();
}
std::size_t nano::bootstrap_ascending::service::score_size () const
{
nano::lock_guard<nano::mutex> lock{ mutex };
return scoring.size ();
}
/** Inspects a block that has been processed by the block processor
- Marks an account as blocked if the result code is gap source as there is no reason request additional blocks for this account until the dependency is resolved
- Marks an account as forwarded if it has been recently referenced by a block that has been inserted.
@ -193,33 +199,11 @@ void nano::bootstrap_ascending::service::wait_blockprocessor ()
}
}
void nano::bootstrap_ascending::service::wait_available_request ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && !limiter.should_pass (1))
{
condition.wait_for (lock, 50ms, [this] () { return stopped; }); // Give it at least some time to cooldown to avoid hitting the limit too frequently
}
}
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::service::available_channel ()
{
auto channels = network.random_set (32, network_consts.bootstrap_protocol_version_min, /* include temporary channels */ true);
for (auto & channel : channels)
{
if (!channel->max (nano::transport::traffic_type::bootstrap))
{
return channel;
}
}
return nullptr;
}
std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::service::wait_available_channel ()
{
std::shared_ptr<nano::transport::channel> channel;
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && !(channel = available_channel ()))
while (!stopped && !(channel = scoring.channel ()))
{
condition.wait_for (lock, 100ms, [this] () { return stopped; });
}
@ -303,19 +287,16 @@ bool nano::bootstrap_ascending::service::run_one ()
// Ensure there is enough space in blockprocessor for queuing new blocks
wait_blockprocessor ();
// Do not do too many requests in parallel, impose throttling
wait_available_request ();
// Waits for channel that is not full
auto channel = wait_available_channel ();
if (!channel)
// Waits for account either from priority queue or database
auto account = wait_available_account ();
if (account.is_zero ())
{
return false;
}
// Waits for account either from priority queue or database
auto account = wait_available_account ();
if (account.is_zero ())
// Waits for channel that is not full
auto channel = wait_available_channel ();
if (!channel)
{
return false;
}
@ -352,6 +333,8 @@ void nano::bootstrap_ascending::service::run_timeouts ()
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
scoring.sync (network.list ());
scoring.timeout ();
auto & tags_by_order = tags.get<tag_sequenced> ();
while (!tags_by_order.empty () && nano::time_difference (tags_by_order.front ().time, nano::milliseconds_since_epoch ()) > config.bootstrap_ascending.timeout)
{
@ -364,7 +347,7 @@ void nano::bootstrap_ascending::service::run_timeouts ()
}
}
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack & message)
void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & message, std::shared_ptr<nano::transport::channel> channel)
{
nano::unique_lock<nano::mutex> lock{ mutex };
@ -375,6 +358,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack & mes
auto iterator = tags_by_id.find (message.id);
auto tag = *iterator;
tags_by_id.erase (iterator);
scoring.received_message (channel);
lock.unlock ();

View file

@ -9,6 +9,7 @@
#include <nano/node/bootstrap_ascending/account_sets.hpp>
#include <nano/node/bootstrap_ascending/common.hpp>
#include <nano/node/bootstrap_ascending/iterators.hpp>
#include <nano/node/bootstrap_ascending/peer_scoring.hpp>
#include <nano/node/bootstrap_ascending/throttle.hpp>
#include <boost/multi_index/hashed_index.hpp>
@ -50,12 +51,13 @@ namespace bootstrap_ascending
/**
* Process `asc_pull_ack` message coming from network
*/
void process (nano::asc_pull_ack const & message);
void process (nano::asc_pull_ack const & message, std::shared_ptr<nano::transport::channel> channel);
public: // Container info
std::unique_ptr<nano::container_info_component> collect_container_info (std::string const & name);
size_t blocked_size () const;
size_t priority_size () const;
size_t score_size () const;
private: // Dependencies
nano::node_config & config;
@ -97,13 +99,10 @@ namespace bootstrap_ascending
bool run_one ();
void run_timeouts ();
/* Limits the number of requests per second we make */
void wait_available_request ();
/* Throttles requesting new blocks, not to overwhelm blockprocessor */
void wait_blockprocessor ();
/* Waits for channel with free capacity for bootstrap messages */
std::shared_ptr<nano::transport::channel> wait_available_channel ();
std::shared_ptr<nano::transport::channel> available_channel ();
/* Waits until a suitable account outside of cool down period is available */
nano::account available_account ();
nano::account wait_available_account ();
@ -155,7 +154,7 @@ namespace bootstrap_ascending
// clang-format on
ordered_tags tags;
nano::bandwidth_limiter limiter;
nano::bootstrap_ascending::peer_scoring scoring;
// Requests for accounts from database have much lower hitrate and could introduce strain on the network
// A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue
nano::bandwidth_limiter database_limiter;

View file

@ -510,7 +510,7 @@ public:
void asc_pull_ack (nano::asc_pull_ack const & message) override
{
node.ascendboot.process (message);
node.ascendboot.process (message, channel);
}
private: