diff --git a/nano/core_test/request_aggregator.cpp b/nano/core_test/request_aggregator.cpp index 9192f080..81714cb6 100644 --- a/nano/core_test/request_aggregator.cpp +++ b/nano/core_test/request_aggregator.cpp @@ -51,6 +51,7 @@ TEST (request_aggregator, one) ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); + ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped)); ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } @@ -87,6 +88,7 @@ TEST (request_aggregator, one_update) ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); + ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped)); ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); } @@ -130,6 +132,7 @@ TEST (request_aggregator, two) ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); ASSERT_EQ (1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); + ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped)); ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); // Make sure the cached vote is for both hashes auto vote1 (node.votes_cache.find (send1->hash ())); @@ -170,6 +173,7 @@ TEST (request_aggregator, two_endpoints) ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); ASSERT_EQ (1, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); + ASSERT_EQ (0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped)); } TEST (request_aggregator, split) @@ -218,6 +222,7 @@ TEST (request_aggregator, split) ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_ignored)); ASSERT_EQ (2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated)); ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cached)); + ASSERT_EQ (0, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped)); ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out)); auto transaction (node.store.tx_begin_read ()); auto pre_last_hash (node.store.block_get (transaction, previous)->previous ()); @@ -285,3 +290,26 @@ TEST (request_aggregator, channel_update) ASSERT_NO_ERROR (system.poll ()); } } + +TEST (request_aggregator, channel_max_queue) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + node_config.max_queued_requests = 1; + auto & node (*system.add_node (node_config)); + nano::genesis genesis; + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node.work_generate_blocking (genesis.hash ()))); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (node.store.tx_begin_write (), *send1).code); + std::vector> request; + request.emplace_back (send1->hash (), send1->root ()); + auto channel (node.network.udp_channels.create (node.network.endpoint ())); + node.aggregator.add (channel, request); + node.aggregator.add (channel, request); + system.deadline_set (3s); + while (node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_dropped) < 1) + { + ASSERT_NO_ERROR (system.poll ()); + } +} \ No newline at end of file diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index e403d0b4..d9c98be9 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -180,6 +180,7 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.vote_minimum, defaults.node.vote_minimum); ASSERT_EQ (conf.node.work_peers, defaults.node.work_peers); ASSERT_EQ (conf.node.work_threads, defaults.node.work_threads); + ASSERT_EQ (conf.node.max_queued_requests, defaults.node.max_queued_requests); ASSERT_EQ (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value); ASSERT_EQ (conf.node.logging.flush, defaults.node.logging.flush); @@ -412,6 +413,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) work_threads = 999 work_watcher_period = 999 max_work_generate_multiplier = 1.0 + max_queued_requests = 999 frontiers_confirmation = "always" [node.diagnostics.txn_tracking] enable = true @@ -566,6 +568,7 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.vote_minimum, defaults.node.vote_minimum); ASSERT_NE (conf.node.work_peers, defaults.node.work_peers); ASSERT_NE (conf.node.work_threads, defaults.node.work_threads); + ASSERT_NE (conf.node.max_queued_requests, defaults.node.max_queued_requests); ASSERT_NE (conf.node.logging.bulk_pull_logging_value, defaults.node.logging.bulk_pull_logging_value); ASSERT_NE (conf.node.logging.flush, defaults.node.logging.flush); diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index 64fe6d77..07b2077f 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -652,6 +652,9 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::requests_ignored: res = "requests_votes_ignored"; break; + case nano::stat::detail::requests_dropped: + res = "requests_dropped"; + break; } return res; } diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index d5f7c5ff..304b2adf 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -298,7 +298,8 @@ public: // requests requests_cached, requests_generated, - requests_ignored + requests_ignored, + requests_dropped }; /** Direction of the stat. If the direction is irrelevant, use in */ diff --git a/nano/node/node.cpp b/nano/node/node.cpp index eacbcf55..a9a44ea7 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -141,7 +141,7 @@ online_reps (ledger, network_params, config.online_weight_minimum.number ()), votes_cache (wallets), vote_uniquer (block_uniquer), active (*this), -aggregator (stats, network_params.network, votes_cache, store, wallets), +aggregator (network_params.network, config, stats, votes_cache, store, wallets), confirmation_height_processor (pending_confirmation_height, ledger, active, write_database_queue, config.conf_height_processor_batch_min_time, logger), payment_observer_processor (observers.blocks), wallets (wallets_store.init_error (), *this), diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 175e9303..45fb4295 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -101,6 +101,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const toml.put ("work_watcher_period", work_watcher_period.count (), "Time between checks for confirmation and re-generating higher difficulty work if unconfirmed, for blocks in the work watcher.\ntype:seconds"); toml.put ("max_work_generate_multiplier", max_work_generate_multiplier, "Maximum allowed difficulty multiplier for work generation.\ntype:double,[1..]"); toml.put ("frontiers_confirmation", serialize_frontiers_confirmation (frontiers_confirmation), "Mode controlling frontier confirmation rate.\ntype:string,{auto,always,disabled}"); + toml.put ("max_queued_requests", max_queued_requests, "Limit for number of queued confirmation requests for one channel, after which new requests are dropped until the queue drops below this value.\ntype:uint32"); auto work_peers_l (toml.create_array ("work_peers", "A list of \"address:port\" entries to identify work peers.")); for (auto i (work_peers.begin ()), n (work_peers.end ()); i != n; ++i) @@ -333,6 +334,8 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) toml.get ("max_work_generate_multiplier", max_work_generate_multiplier); max_work_generate_difficulty = nano::difficulty::from_multiplier (max_work_generate_multiplier, network.publish_threshold); + toml.get ("max_queued_requests", max_queued_requests); + if (toml.has_key ("frontiers_confirmation")) { auto frontiers_confirmation_l (toml.get ("frontiers_confirmation")); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 03e4d441..a7d3d3c0 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -93,6 +93,7 @@ public: std::chrono::seconds work_watcher_period{ std::chrono::seconds (5) }; double max_work_generate_multiplier{ 64. }; uint64_t max_work_generate_difficulty{ nano::network_constants::publish_full_threshold }; + uint32_t max_queued_requests{ 512 }; nano::rocksdb_config rocksdb_config; nano::frontiers_confirmation_mode frontiers_confirmation{ nano::frontiers_confirmation_mode::automatic }; std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; diff --git a/nano/node/request_aggregator.cpp b/nano/node/request_aggregator.cpp index fcb5525c..6cba4f23 100644 --- a/nano/node/request_aggregator.cpp +++ b/nano/node/request_aggregator.cpp @@ -2,15 +2,17 @@ #include #include #include +#include #include #include #include #include #include -nano::request_aggregator::request_aggregator (nano::stat & stats_a, nano::network_constants const & network_constants_a, nano::votes_cache & cache_a, nano::block_store & store_a, nano::wallets & wallets_a) : +nano::request_aggregator::request_aggregator (nano::network_constants const & network_constants_a, nano::node_config const & config_a, nano::stat & stats_a, nano::votes_cache & cache_a, nano::block_store & store_a, nano::wallets & wallets_a) : max_delay (network_constants_a.is_test_network () ? 50 : 300), small_delay (network_constants_a.is_test_network () ? 10 : 50), +max_channel_requests (config_a.max_queued_requests), stats (stats_a), votes_cache (cache_a), store (store_a), @@ -24,25 +26,39 @@ thread ([this]() { run (); }) void nano::request_aggregator::add (std::shared_ptr & channel_a, std::vector> const & hashes_roots_a) { assert (wallets.rep_counts ().voting > 0); + bool error = true; auto const endpoint (nano::transport::map_endpoint_to_v6 (channel_a->get_endpoint ())); nano::unique_lock lock (mutex); - auto & requests_by_endpoint (requests.get ()); - auto existing (requests_by_endpoint.find (endpoint)); - if (existing == requests_by_endpoint.end ()) + // Protecting from ever-increasing memory usage when request are consumed slower than generated + // Reject request if the oldest request has not yet been processed after its deadline + a modest margin + if (requests.empty () || (requests.get ().begin ()->deadline + 2 * this->max_delay > std::chrono::steady_clock::now ())) { - existing = requests_by_endpoint.emplace (channel_a).first; + auto & requests_by_endpoint (requests.get ()); + auto existing (requests_by_endpoint.find (endpoint)); + if (existing == requests_by_endpoint.end ()) + { + existing = requests_by_endpoint.emplace (channel_a).first; + } + requests_by_endpoint.modify (existing, [&hashes_roots_a, &channel_a, &error, this](channel_pool & pool_a) { + // This extends the lifetime of the channel, which is acceptable up to max_delay + pool_a.channel = channel_a; + if (pool_a.hashes_roots.size () + hashes_roots_a.size () <= this->max_channel_requests) + { + error = false; + auto new_deadline (std::min (pool_a.start + this->max_delay, std::chrono::steady_clock::now () + this->small_delay)); + pool_a.deadline = new_deadline; + pool_a.hashes_roots.insert (pool_a.hashes_roots.begin (), hashes_roots_a.begin (), hashes_roots_a.end ()); + } + }); + if (requests.size () == 1) + { + lock.unlock (); + condition.notify_all (); + } } - requests_by_endpoint.modify (existing, [&hashes_roots_a, &channel_a, this](channel_pool & pool_a) { - // This extends the lifetime of the channel, which is acceptable up to max_delay - pool_a.channel = channel_a; - auto new_deadline (std::min (pool_a.start + this->max_delay, std::chrono::steady_clock::now () + this->small_delay)); - pool_a.deadline = new_deadline; - pool_a.hashes_roots.insert (pool_a.hashes_roots.begin (), hashes_roots_a.begin (), hashes_roots_a.end ()); - }); - if (requests.size () == 1) + if (error) { - lock.unlock (); - condition.notify_all (); + stats.inc (nano::stat::type::requests, nano::stat::detail::requests_dropped); } } diff --git a/nano/node/request_aggregator.hpp b/nano/node/request_aggregator.hpp index ceee0f78..471355fe 100644 --- a/nano/node/request_aggregator.hpp +++ b/nano/node/request_aggregator.hpp @@ -21,6 +21,7 @@ class votes_cache; class block_store; class wallets; class stat; +class node_config; /** * Pools together confirmation requests, separately for each endpoint. * Requests are added from network messages, and aggregated to minimize bandwidth and vote generation. Example: @@ -57,7 +58,7 @@ class request_aggregator final public: request_aggregator () = delete; - request_aggregator (nano::stat &, nano::network_constants const &, nano::votes_cache &, nano::block_store &, nano::wallets &); + request_aggregator (nano::network_constants const &, nano::node_config const & config, nano::stat & stats_a, nano::votes_cache &, nano::block_store &, nano::wallets &); /** Add a new request by \p channel_a for hashes \p hashes_roots_a */ void add (std::shared_ptr & channel_a, std::vector> const & hashes_roots_a); @@ -68,6 +69,7 @@ public: const std::chrono::milliseconds max_delay; const std::chrono::milliseconds small_delay; + const size_t max_channel_requests; private: void run ();