diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 58041028..30c373c3 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -61,6 +61,8 @@ add_library( bootstrap_ascending/account_sets.cpp bootstrap_ascending/iterators.hpp bootstrap_ascending/iterators.cpp + bootstrap_ascending/peer_scoring.hpp + bootstrap_ascending/peer_scoring.cpp bootstrap_ascending/service.hpp bootstrap_ascending/service.cpp cli.hpp diff --git a/nano/node/bootstrap/bootstrap_config.hpp b/nano/node/bootstrap/bootstrap_config.hpp index b11e5677..0861abe6 100644 --- a/nano/node/bootstrap/bootstrap_config.hpp +++ b/nano/node/bootstrap/bootstrap_config.hpp @@ -26,7 +26,8 @@ public: nano::error deserialize (nano::tomlconfig & toml); nano::error serialize (nano::tomlconfig & toml) const; - std::size_t requests_limit{ 1024 }; + // Maximum number of un-responded requests per channel + std::size_t requests_limit{ 4 }; std::size_t database_requests_limit{ 1024 }; std::size_t pull_count{ nano::bootstrap_server::max_blocks }; nano::millis_t timeout{ 1000 * 3 }; diff --git a/nano/node/bootstrap_ascending/peer_scoring.cpp b/nano/node/bootstrap_ascending/peer_scoring.cpp new file mode 100644 index 00000000..faeb4fe3 --- /dev/null +++ b/nano/node/bootstrap_ascending/peer_scoring.cpp @@ -0,0 +1,135 @@ +#include +#include +#include + +/* + * peer_scoring + */ + +nano::bootstrap_ascending::peer_scoring::peer_scoring (nano::bootstrap_ascending_config & config, nano::network_constants const & network_constants) : + network_constants{ network_constants }, + config{ config } +{ +} + +bool nano::bootstrap_ascending::peer_scoring::try_send_message (std::shared_ptr channel) +{ + auto & index = scoring.get (); + auto existing = index.find (channel.get ()); + if (existing == index.end ()) + { + index.emplace (channel, 1, 1, 0); + } + else + { + if (existing->outstanding < config.requests_limit) + { + [[maybe_unused]] auto success = index.modify (existing, [] (auto & score) { + ++score.outstanding; + ++score.request_count_total; + }); + debug_assert (success); + } + else + { + return true; + } + } + return false; +} + +void nano::bootstrap_ascending::peer_scoring::received_message (std::shared_ptr channel) +{ + auto & index = scoring.get (); + auto existing = index.find (channel.get ()); + if (existing != index.end ()) + { + if (existing->outstanding > 1) + { + [[maybe_unused]] auto success = index.modify (existing, [] (auto & score) { + --score.outstanding; + ++score.response_count_total; + }); + debug_assert (success); + } + } +} + +std::shared_ptr nano::bootstrap_ascending::peer_scoring::channel () +{ + auto & index = scoring.get (); + for (auto const & score : index) + { + if (auto channel = score.shared ()) + { + if (!channel->max ()) + { + if (!try_send_message (channel)) + { + return channel; + } + } + } + } + return nullptr; +} + +std::size_t nano::bootstrap_ascending::peer_scoring::size () const +{ + return scoring.size (); +} + +void nano::bootstrap_ascending::peer_scoring::timeout () +{ + auto & index = scoring.get (); + for (auto score = index.begin (), n = index.end (); score != n;) + { + if (auto channel = score->shared ()) + { + if (channel->alive ()) + { + ++score; + continue; + } + } + score = index.erase (score); + } + for (auto score = scoring.begin (), n = scoring.end (); score != n; ++score) + { + scoring.modify (score, [] (auto & score_a) { + score_a.decay (); + }); + } +} + +void nano::bootstrap_ascending::peer_scoring::sync (std::deque> const & list) +{ + auto & index = scoring.get (); + for (auto const & channel : list) + { + if (channel->get_network_version () >= network_constants.bootstrap_protocol_version_min) + { + if (index.find (channel.get ()) == index.end ()) + { + if (!channel->max (nano::transport::traffic_type::bootstrap)) + { + index.emplace (channel, 1, 1, 0); + } + } + } + } +} + +/* + * peer_score + */ + +nano::bootstrap_ascending::peer_scoring::peer_score::peer_score ( +std::shared_ptr const & channel_a, uint64_t outstanding_a, uint64_t request_count_total_a, uint64_t response_count_total_a) : + channel{ channel_a }, + channel_ptr{ channel_a.get () }, + outstanding{ outstanding_a }, + request_count_total{ request_count_total_a }, + response_count_total{ response_count_total_a } +{ +} diff --git a/nano/node/bootstrap_ascending/peer_scoring.hpp b/nano/node/bootstrap_ascending/peer_scoring.hpp new file mode 100644 index 00000000..19252010 --- /dev/null +++ b/nano/node/bootstrap_ascending/peer_scoring.hpp @@ -0,0 +1,87 @@ +#pragma once + +#include + +#include +#include +#include +#include + +#include +#include + +namespace mi = boost::multi_index; + +namespace nano +{ +class bootstrap_ascending_config; +class network_constants; +namespace transport +{ + class channel; +} +namespace bootstrap_ascending +{ + // Container for tracking and scoring peers with respect to bootstrapping + class peer_scoring + { + public: + peer_scoring (nano::bootstrap_ascending_config & config, nano::network_constants const & network_constants); + // Returns true if channel limit has been exceeded + bool try_send_message (std::shared_ptr channel); + void received_message (std::shared_ptr channel); + std::shared_ptr channel (); + [[nodiscard]] std::size_t size () const; + // Cleans up scores for closed channels + // Decays scores which become inaccurate over time due to message drops + void timeout (); + void sync (std::deque> const & list); + + private: + class peer_score + { + public: + explicit peer_score (std::shared_ptr const &, uint64_t, uint64_t, uint64_t); + std::weak_ptr channel; + // std::weak_ptr does not provide ordering so the naked pointer is also tracked and used for ordering channels + // This pointer may be invalid if the channel has been destroyed + nano::transport::channel * channel_ptr; + // Acquire reference to the shared channel object if it is still valid + [[nodiscard]] std::shared_ptr shared () const + { + auto result = channel.lock (); + if (result) + { + debug_assert (result.get () == channel_ptr); + } + return result; + } + void decay () + { + outstanding = outstanding > 0 ? outstanding - 1 : 0; + } + // Number of outstanding requests to a peer + uint64_t outstanding{ 0 }; + uint64_t request_count_total{ 0 }; + uint64_t response_count_total{ 0 }; + }; + nano::network_constants const & network_constants; + nano::bootstrap_ascending_config & config; + + // clang-format off + // Indexes scores by their shared channel pointer + class tag_channel {}; + // Indexes scores by the number of outstanding requests in ascending order + class tag_outstanding {}; + + using scoring_t = boost::multi_index_container, + mi::member>, + mi::ordered_non_unique, + mi::member>>>; + // clang-format on + scoring_t scoring; + }; +} +} diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index f9331cd8..4fda8937 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -24,7 +24,7 @@ nano::bootstrap_ascending::service::service (nano::node_config & config_a, nano: accounts{ stats }, iterator{ ledger.store }, throttle{ config.bootstrap_ascending.throttle_count }, - limiter{ config.bootstrap_ascending.requests_limit, 1.0 }, + scoring{ config.bootstrap_ascending, config.network_params.network }, database_limiter{ config.bootstrap_ascending.database_requests_limit, 1.0 } { // TODO: This is called from a very congested blockprocessor thread. Offload this work to a dedicated processing thread @@ -114,6 +114,12 @@ size_t nano::bootstrap_ascending::service::blocked_size () const return accounts.blocked_size (); } +std::size_t nano::bootstrap_ascending::service::score_size () const +{ + nano::lock_guard lock{ mutex }; + return scoring.size (); +} + /** Inspects a block that has been processed by the block processor - Marks an account as blocked if the result code is gap source as there is no reason request additional blocks for this account until the dependency is resolved - Marks an account as forwarded if it has been recently referenced by a block that has been inserted. @@ -193,33 +199,11 @@ void nano::bootstrap_ascending::service::wait_blockprocessor () } } -void nano::bootstrap_ascending::service::wait_available_request () -{ - nano::unique_lock lock{ mutex }; - while (!stopped && !limiter.should_pass (1)) - { - condition.wait_for (lock, 50ms, [this] () { return stopped; }); // Give it at least some time to cooldown to avoid hitting the limit too frequently - } -} - -std::shared_ptr nano::bootstrap_ascending::service::available_channel () -{ - auto channels = network.random_set (32, network_consts.bootstrap_protocol_version_min, /* include temporary channels */ true); - for (auto & channel : channels) - { - if (!channel->max (nano::transport::traffic_type::bootstrap)) - { - return channel; - } - } - return nullptr; -} - std::shared_ptr nano::bootstrap_ascending::service::wait_available_channel () { std::shared_ptr channel; nano::unique_lock lock{ mutex }; - while (!stopped && !(channel = available_channel ())) + while (!stopped && !(channel = scoring.channel ())) { condition.wait_for (lock, 100ms, [this] () { return stopped; }); } @@ -303,19 +287,16 @@ bool nano::bootstrap_ascending::service::run_one () // Ensure there is enough space in blockprocessor for queuing new blocks wait_blockprocessor (); - // Do not do too many requests in parallel, impose throttling - wait_available_request (); - - // Waits for channel that is not full - auto channel = wait_available_channel (); - if (!channel) + // Waits for account either from priority queue or database + auto account = wait_available_account (); + if (account.is_zero ()) { return false; } - // Waits for account either from priority queue or database - auto account = wait_available_account (); - if (account.is_zero ()) + // Waits for channel that is not full + auto channel = wait_available_channel (); + if (!channel) { return false; } @@ -352,6 +333,8 @@ void nano::bootstrap_ascending::service::run_timeouts () nano::unique_lock lock{ mutex }; while (!stopped) { + scoring.sync (network.list ()); + scoring.timeout (); auto & tags_by_order = tags.get (); while (!tags_by_order.empty () && nano::time_difference (tags_by_order.front ().time, nano::milliseconds_since_epoch ()) > config.bootstrap_ascending.timeout) { @@ -364,7 +347,7 @@ void nano::bootstrap_ascending::service::run_timeouts () } } -void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack & message) +void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & message, std::shared_ptr channel) { nano::unique_lock lock{ mutex }; @@ -375,6 +358,7 @@ void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack & mes auto iterator = tags_by_id.find (message.id); auto tag = *iterator; tags_by_id.erase (iterator); + scoring.received_message (channel); lock.unlock (); diff --git a/nano/node/bootstrap_ascending/service.hpp b/nano/node/bootstrap_ascending/service.hpp index bf549b8c..ba392585 100644 --- a/nano/node/bootstrap_ascending/service.hpp +++ b/nano/node/bootstrap_ascending/service.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -50,12 +51,13 @@ namespace bootstrap_ascending /** * Process `asc_pull_ack` message coming from network */ - void process (nano::asc_pull_ack const & message); + void process (nano::asc_pull_ack const & message, std::shared_ptr channel); public: // Container info std::unique_ptr collect_container_info (std::string const & name); size_t blocked_size () const; size_t priority_size () const; + size_t score_size () const; private: // Dependencies nano::node_config & config; @@ -97,13 +99,10 @@ namespace bootstrap_ascending bool run_one (); void run_timeouts (); - /* Limits the number of requests per second we make */ - void wait_available_request (); /* Throttles requesting new blocks, not to overwhelm blockprocessor */ void wait_blockprocessor (); /* Waits for channel with free capacity for bootstrap messages */ std::shared_ptr wait_available_channel (); - std::shared_ptr available_channel (); /* Waits until a suitable account outside of cool down period is available */ nano::account available_account (); nano::account wait_available_account (); @@ -155,7 +154,7 @@ namespace bootstrap_ascending // clang-format on ordered_tags tags; - nano::bandwidth_limiter limiter; + nano::bootstrap_ascending::peer_scoring scoring; // Requests for accounts from database have much lower hitrate and could introduce strain on the network // A separate (lower) limiter ensures that we always reserve resources for querying accounts from priority queue nano::bandwidth_limiter database_limiter; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 4f9a8c7e..a612697e 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -510,7 +510,7 @@ public: void asc_pull_ack (nano::asc_pull_ack const & message) override { - node.ascendboot.process (message); + node.ascendboot.process (message, channel); } private: