Bounded request aggregator (#2501)

Limit the number of requests per channel and stop accepting requests if processing is slower than new requests are being queued
This commit is contained in:
Guilherme Lawless 2020-01-21 14:35:37 +00:00 committed by GitHub
commit c1bf474a6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 75 additions and 18 deletions

View file

@ -51,6 +51,7 @@ TEST (request_aggregator, one)
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
@ -87,6 +88,7 @@ TEST (request_aggregator, one_update)
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}
@ -130,6 +132,7 @@ TEST (request_aggregator, two)
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
// Make sure the cached vote is for both hashes
auto vote1 (node.votes_cache.find (send1->hash ()));
@ -170,6 +173,7 @@ TEST (request_aggregator, two_endpoints)
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
}
TEST (request_aggregator, split)
@ -218,6 +222,7 @@ TEST (request_aggregator, split)
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored));
ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped));
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
auto transaction (node.store.tx_begin_read ());
auto pre_last_hash (node.store.block_get (transaction, previous)->previous ());
@ -285,3 +290,26 @@ TEST (request_aggregator, channel_update)
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (request_aggregator, channel_max_queue)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
node_config.max_queued_requests = 1;
auto & node (*system.add_node (node_config));
nano::genesis genesis;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ())));
ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code);
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
node.aggregator.add (channel, request);
node.aggregator.add (channel, request);
system.deadline_set (3s);
while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}
}

View file

@ -180,6 +180,7 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.vote_minimum, defaults.node.vote_minimum);
ASSERT_EQ (conf.node.work_peers, defaults.node.work_peers);
ASSERT_EQ (conf.node.work_threads, defaults.node.work_threads);
ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests);
ASSERT_EQ (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value);
ASSERT_EQ (conf.node.logging.flush, defaults.node.logging.flush);
@ -412,6 +413,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
work_threads = 999
work_watcher_period = 999
max_work_generate_multiplier = 1.0
max_queued_requests = 999
frontiers_confirmation = "always"
[node.diagnostics.txn_tracking]
enable = true
@ -566,6 +568,7 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.vote_minimum, defaults.node.vote_minimum);
ASSERT_NE (conf.node.work_peers, defaults.node.work_peers);
ASSERT_NE (conf.node.work_threads, defaults.node.work_threads);
ASSERT_NE (conf.node.max_queued_requests, defaults.node.max_queued_requests);
ASSERT_NE (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value);
ASSERT_NE (conf.node.logging.flush, defaults.node.logging.flush);

View file

@ -652,6 +652,9 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::requests_ignored:
res = "requests_votes_ignored";
break;
case nano::stat::detail::requests_dropped:
res = "requests_dropped";
break;
}
return res;
}

View file

@ -298,7 +298,8 @@ public:
// requests
requests_cached,
requests_generated,
requests_ignored
requests_ignored,
requests_dropped
};
/** Direction of the stat. If the direction is irrelevant, use in */

View file

@ -141,7 +141,7 @@ online_reps (ledger, network_params, config.online_weight_minimum.number ()),
votes_cache (wallets),
vote_uniquer (block_uniquer),
active (*this),
aggregator (stats, network_params.network, votes_cache, store, wallets),
aggregator (network_params.network, config, stats, votes_cache, store, wallets),
confirmation_height_processor (pending_confirmation_height, ledger, active, write_database_queue, config.conf_height_processor_batch_min_time, logger),
payment_observer_processor (observers.blocks),
wallets (wallets_store.init_error (), *this),

View file

@ -101,6 +101,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
toml.put ("work_watcher_period", work_watcher_period.count (), "Time between checks for confirmation and re-generating higher difficulty work if unconfirmed, for blocks in the work watcher.\ntype:seconds");
toml.put ("max_work_generate_multiplier", max_work_generate_multiplier, "Maximum allowed difficulty multiplier for work generation.\ntype:double,[1..]");
toml.put ("frontiers_confirmation", serialize_frontiers_confirmation (frontiers_confirmation), "Mode controlling frontier confirmation rate.\ntype:string,{auto,always,disabled}");
toml.put ("max_queued_requests", max_queued_requests, "Limit for number of queued confirmation requests for one channel, after which new requests are dropped until the queue drops below this value.\ntype:uint32");
auto work_peers_l (toml.create_array ("work_peers", "A list of \"address:port\" entries to identify work peers."));
for (auto i (work_peers.begin ()), n (work_peers.end ()); i != n; ++i)
@ -333,6 +334,8 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
toml.get<double> ("max_work_generate_multiplier", max_work_generate_multiplier);
max_work_generate_difficulty = nano::difficulty::from_multiplier (max_work_generate_multiplier, network.publish_threshold);
toml.get<uint32_t> ("max_queued_requests", max_queued_requests);
if (toml.has_key ("frontiers_confirmation"))
{
auto frontiers_confirmation_l (toml.get<std::string> ("frontiers_confirmation"));

View file

@ -93,6 +93,7 @@ public:
std::chrono::seconds work_watcher_period{ std::chrono::seconds (5) };
double max_work_generate_multiplier{ 64. };
uint64_t max_work_generate_difficulty{ nano::network_constants::publish_full_threshold };
uint32_t max_queued_requests{ 512 };
nano::rocksdb_config rocksdb_config;
nano::frontiers_confirmation_mode frontiers_confirmation{ nano::frontiers_confirmation_mode::automatic };
std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const;

View file

@ -2,15 +2,17 @@
#include <nano/lib/threading.hpp>
#include <nano/node/common.hpp>
#include <nano/node/network.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/node/voting.hpp>
#include <nano/node/wallet.hpp>
#include <nano/secure/blockstore.hpp>
nano::request_aggregator::request_aggregator (nano::stat & stats_a, nano::network_constants const & network_constants_a, nano::votes_cache & cache_a, nano::block_store & store_a, nano::wallets & wallets_a) :
nano::request_aggregator::request_aggregator (nano::network_constants const & network_constants_a, nano::node_config const & config_a, nano::stat & stats_a, nano::votes_cache & cache_a, nano::block_store & store_a, nano::wallets & wallets_a) :
max_delay (network_constants_a.is_test_network () ? 50 : 300),
small_delay (network_constants_a.is_test_network () ? 10 : 50),
max_channel_requests (config_a.max_queued_requests),
stats (stats_a),
votes_cache (cache_a),
store (store_a),
@ -24,25 +26,39 @@ thread ([this]() { run (); })
void nano::request_aggregator::add (std::shared_ptr<nano::transport::channel> & channel_a, std::vector<std::pair<nano::block_hash, nano::root>> const & hashes_roots_a)
{
assert (wallets.rep_counts ().voting > 0);
bool error = true;
auto const endpoint (nano::transport::map_endpoint_to_v6 (channel_a->get_endpoint ()));
nano::unique_lock<std::mutex> lock (mutex);
auto & requests_by_endpoint (requests.get<tag_endpoint> ());
auto existing (requests_by_endpoint.find (endpoint));
if (existing == requests_by_endpoint.end ())
// Protecting from ever-increasing memory usage when request are consumed slower than generated
// Reject request if the oldest request has not yet been processed after its deadline + a modest margin
if (requests.empty () || (requests.get<tag_deadline> ().begin ()->deadline + 2 * this->max_delay > std::chrono::steady_clock::now ()))
{
existing = requests_by_endpoint.emplace (channel_a).first;
auto & requests_by_endpoint (requests.get<tag_endpoint> ());
auto existing (requests_by_endpoint.find (endpoint));
if (existing == requests_by_endpoint.end ())
{
existing = requests_by_endpoint.emplace (channel_a).first;
}
requests_by_endpoint.modify (existing, [&hashes_roots_a, &channel_a, &error, this](channel_pool & pool_a) {
// This extends the lifetime of the channel, which is acceptable up to max_delay
pool_a.channel = channel_a;
if (pool_a.hashes_roots.size () + hashes_roots_a.size () <= this->max_channel_requests)
{
error = false;
auto new_deadline (std::min (pool_a.start + this->max_delay, std::chrono::steady_clock::now () + this->small_delay));
pool_a.deadline = new_deadline;
pool_a.hashes_roots.insert (pool_a.hashes_roots.begin (), hashes_roots_a.begin (), hashes_roots_a.end ());
}
});
if (requests.size () == 1)
{
lock.unlock ();
condition.notify_all ();
}
}
requests_by_endpoint.modify (existing, [&hashes_roots_a, &channel_a, this](channel_pool & pool_a) {
// This extends the lifetime of the channel, which is acceptable up to max_delay
pool_a.channel = channel_a;
auto new_deadline (std::min (pool_a.start + this->max_delay, std::chrono::steady_clock::now () + this->small_delay));
pool_a.deadline = new_deadline;
pool_a.hashes_roots.insert (pool_a.hashes_roots.begin (), hashes_roots_a.begin (), hashes_roots_a.end ());
});
if (requests.size () == 1)
if (error)
{
lock.unlock ();
condition.notify_all ();
stats.inc (nano::stat::type::requests, nano::stat::detail::requests_dropped);
}
}

View file

@ -21,6 +21,7 @@ class votes_cache;
class block_store;
class wallets;
class stat;
class node_config;
/**
* Pools together confirmation requests, separately for each endpoint.
* Requests are added from network messages, and aggregated to minimize bandwidth and vote generation. Example:
@ -57,7 +58,7 @@ class request_aggregator final
public:
request_aggregator () = delete;
request_aggregator (nano::stat &, nano::network_constants const &, nano::votes_cache &, nano::block_store &, nano::wallets &);
request_aggregator (nano::network_constants const &, nano::node_config const & config, nano::stat & stats_a, nano::votes_cache &, nano::block_store &, nano::wallets &);
/** Add a new request by \p channel_a for hashes \p hashes_roots_a */
void add (std::shared_ptr<nano::transport::channel> & channel_a, std::vector<std::pair<nano::block_hash, nano::root>> const & hashes_roots_a);
@ -68,6 +69,7 @@ public:
const std::chrono::milliseconds max_delay;
const std::chrono::milliseconds small_delay;
const size_t max_channel_requests;
private:
void run ();