Merge pull request #4476 from pwojcikdev/fair-queuing-3

Fair queuing for block processor
This commit is contained in:
Piotr Wójcik 2024-04-04 12:08:51 +02:00 committed by GitHub
commit ec6207f435
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 832 additions and 98 deletions

View file

@ -21,6 +21,7 @@ add_executable(
election_scheduler.cpp
enums.cpp
epochs.cpp
fair_queue.cpp
frontiers_confirmation.cpp
ipc.cpp
ledger.cpp

View file

@ -0,0 +1,276 @@
#include <nano/node/fair_queue.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
#include <ranges>
using namespace std::chrono_literals;
namespace
{
enum class source_enum
{
unknown = 0,
live,
bootstrap,
bootstrap_legacy,
unchecked,
local,
forced,
};
}
TEST (fair_queue, construction)
{
nano::fair_queue<source_enum, int> queue;
ASSERT_EQ (queue.total_size (), 0);
ASSERT_TRUE (queue.empty ());
}
TEST (fair_queue, process_one)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 1; };
queue.push (7, { source_enum::live });
ASSERT_EQ (queue.total_size (), 1);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live }), 1);
ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 0);
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 7);
ASSERT_EQ (origin.source, source_enum::live);
ASSERT_EQ (origin.channel, nullptr);
ASSERT_TRUE (queue.empty ());
}
TEST (fair_queue, fifo)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 999; };
queue.push (7, { source_enum::live });
queue.push (8, { source_enum::live });
queue.push (9, { source_enum::live });
ASSERT_EQ (queue.total_size (), 3);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live }), 3);
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 7);
ASSERT_EQ (origin.source, source_enum::live);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 8);
ASSERT_EQ (origin.source, source_enum::live);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 9);
ASSERT_EQ (origin.source, source_enum::live);
}
ASSERT_TRUE (queue.empty ());
}
TEST (fair_queue, process_many)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 1; };
queue.push (7, { source_enum::live });
queue.push (8, { source_enum::bootstrap });
queue.push (9, { source_enum::unchecked });
ASSERT_EQ (queue.total_size (), 3);
ASSERT_EQ (queue.queues_size (), 3);
ASSERT_EQ (queue.size ({ source_enum::live }), 1);
ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 1);
ASSERT_EQ (queue.size ({ source_enum::unchecked }), 1);
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 7);
ASSERT_EQ (origin.source, source_enum::live);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 8);
ASSERT_EQ (origin.source, source_enum::bootstrap);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 9);
ASSERT_EQ (origin.source, source_enum::unchecked);
}
ASSERT_TRUE (queue.empty ());
}
TEST (fair_queue, max_queue_size)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 2; };
queue.push (7, { source_enum::live });
queue.push (8, { source_enum::live });
queue.push (9, { source_enum::live });
ASSERT_EQ (queue.total_size (), 2);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live }), 2);
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 7);
ASSERT_EQ (origin.source, source_enum::live);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 8);
ASSERT_EQ (origin.source, source_enum::live);
}
ASSERT_TRUE (queue.empty ());
}
TEST (fair_queue, round_robin_with_priority)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const & origin) {
switch (origin.source)
{
case source_enum::live:
return 1;
case source_enum::bootstrap:
return 2;
case source_enum::unchecked:
return 3;
default:
return 0;
}
};
queue.max_size_query = [] (auto const &) { return 999; };
queue.push (7, { source_enum::live });
queue.push (8, { source_enum::live });
queue.push (9, { source_enum::live });
queue.push (10, { source_enum::bootstrap });
queue.push (11, { source_enum::bootstrap });
queue.push (12, { source_enum::bootstrap });
queue.push (13, { source_enum::unchecked });
queue.push (14, { source_enum::unchecked });
queue.push (15, { source_enum::unchecked });
ASSERT_EQ (queue.total_size (), 9);
// Processing 1x live, 2x bootstrap, 3x unchecked before moving to the next source
ASSERT_EQ (queue.next ().second.source, source_enum::live);
ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap);
ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap);
ASSERT_EQ (queue.next ().second.source, source_enum::unchecked);
ASSERT_EQ (queue.next ().second.source, source_enum::unchecked);
ASSERT_EQ (queue.next ().second.source, source_enum::unchecked);
ASSERT_EQ (queue.next ().second.source, source_enum::live);
ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap);
ASSERT_EQ (queue.next ().second.source, source_enum::live);
ASSERT_TRUE (queue.empty ());
}
TEST (fair_queue, source_channel)
{
nano::test::system system{ 1 };
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 999; };
auto channel1 = nano::test::fake_channel (system.node (0));
auto channel2 = nano::test::fake_channel (system.node (0));
auto channel3 = nano::test::fake_channel (system.node (0));
queue.push (6, { source_enum::live, channel1 });
queue.push (7, { source_enum::live, channel2 });
queue.push (8, { source_enum::live, channel3 });
queue.push (9, { source_enum::live, channel1 }); // Channel 1 has multiple entries
ASSERT_EQ (queue.total_size (), 4);
ASSERT_EQ (queue.queues_size (), 3); // Each <source, channel> pair is a separate queue
ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 2);
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);
auto all = queue.next_batch (999);
ASSERT_EQ (all.size (), 4);
auto filtered = [&] (auto const & channel) {
auto r = all | std::views::filter ([&] (auto const & entry) {
return entry.second.channel == channel;
});
std::vector vec (r.begin (), r.end ());
return vec;
};
auto channel1_results = filtered (channel1);
ASSERT_EQ (channel1_results.size (), 2);
{
auto [result, origin] = channel1_results[0];
ASSERT_EQ (result, 6);
ASSERT_EQ (origin.source, source_enum::live);
ASSERT_EQ (origin.channel, channel1);
}
{
auto [result, origin] = channel1_results[1];
ASSERT_EQ (result, 9);
ASSERT_EQ (origin.source, source_enum::live);
ASSERT_EQ (origin.channel, channel1);
}
ASSERT_TRUE (queue.empty ());
}
TEST (fair_queue, cleanup)
{
nano::test::system system{ 1 };
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 999; };
auto channel1 = nano::test::fake_channel (system.node (0));
auto channel2 = nano::test::fake_channel (system.node (0));
auto channel3 = nano::test::fake_channel (system.node (0));
queue.push (7, { source_enum::live, channel1 });
queue.push (8, { source_enum::live, channel2 });
queue.push (9, { source_enum::live, channel3 });
ASSERT_EQ (queue.total_size (), 3);
ASSERT_EQ (queue.queues_size (), 3);
ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);
// Either closing or resetting the channel should remove it from the queue
channel1->close ();
channel2.reset ();
ASSERT_TRUE (queue.periodic_update ());
// Only channel 3 should remain
ASSERT_EQ (queue.total_size (), 1);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 0);
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 0);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);
}

View file

@ -768,10 +768,9 @@ TEST (network, duplicate_detection)
TEST (network, duplicate_revert_publish)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.block_processor.full ());
nano::node_config node_config = system.default_config ();
node_config.block_processor.max_peer_queue = 0;
auto & node (*system.add_node (node_config));
nano::publish publish{ nano::dev::network_params.network, nano::dev::genesis };
std::vector<uint8_t> bytes;
{

View file

@ -525,6 +525,7 @@ TEST (node, expire)
ASSERT_TRUE (node0.expired ());
}
// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed
TEST (node, fork_publish)
{
nano::test::system system (1);
@ -671,6 +672,7 @@ TEST (node, fork_keep)
ASSERT_TRUE (node2.ledger.block_exists (transaction1, send1->hash ()));
}
// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed
TEST (node, fork_flip)
{
nano::test::system system (2);
@ -696,8 +698,7 @@ TEST (node, fork_flip)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
nano::publish publish2{ nano::dev::network_params.network, send2 };
auto ignored_channel{ std::make_shared<nano::transport::channel_tcp> (node1, std::weak_ptr<nano::transport::socket> ()) };
auto ignored_channel = nano::test::fake_channel (node1);
node1.network.inbound (publish1, ignored_channel);
node2.network.inbound (publish2, ignored_channel);
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());

View file

@ -117,6 +117,7 @@ TEST (toml, daemon_config_deserialize_defaults)
std::stringstream ss;
ss << R"toml(
[node]
[node.block_processor]
[node.diagnostics.txn_tracking]
[node.httpcallback]
[node.ipc.local]
@ -254,6 +255,12 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size);
ASSERT_EQ (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters);
ASSERT_EQ (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue);
ASSERT_EQ (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue);
ASSERT_EQ (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live);
ASSERT_EQ (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap);
ASSERT_EQ (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local);
}
TEST (toml, optional_child)
@ -432,6 +439,13 @@ TEST (toml, daemon_config_deserialize_no_defaults)
backlog_scan_batch_size = 999
backlog_scan_frequency = 999
[node.block_processor]
max_peer_queue = 999
max_system_queue = 999
priority_live = 999
priority_bootstrap = 999
priority_local = 999
[node.diagnostics.txn_tracking]
enable = true
ignore_writes_below_block_processor_max_time = false
@ -680,6 +694,12 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size);
ASSERT_NE (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters);
ASSERT_NE (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue);
ASSERT_NE (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue);
ASSERT_NE (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live);
ASSERT_NE (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap);
ASSERT_NE (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local);
}
/** There should be no required values **/

View file

@ -40,6 +40,7 @@ enum class type : uint8_t
blockprocessor,
blockprocessor_source,
blockprocessor_result,
blockprocessor_overfill,
bootstrap_server,
active,
active_started,
@ -85,6 +86,7 @@ enum class detail : uint8_t
success,
unknown,
cache,
queue_overflow,
// processing queue
queue,

View file

@ -76,6 +76,7 @@ add_library(
election_status.hpp
epoch_upgrader.hpp
epoch_upgrader.cpp
fair_queue.hpp
ipc/action_handler.hpp
ipc/action_handler.cpp
ipc/flatbuffers_handler.hpp

View file

@ -8,7 +8,7 @@
#include <nano/secure/ledger.hpp>
#include <nano/store/component.hpp>
#include <boost/format.hpp>
#include <utility>
#include <magic_enum.hpp>
@ -17,7 +17,7 @@
*/
nano::block_processor::context::context (std::shared_ptr<nano::block> block, nano::block_source source_a) :
block{ block },
block{ std::move (block) },
source{ source_a }
{
debug_assert (source != nano::block_source::unknown);
@ -38,6 +38,7 @@ void nano::block_processor::context::set_result (result_t const & result)
*/
nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) :
config{ node_a.config.block_processor },
node (node_a),
write_database_queue (write_database_queue_a),
next_log (std::chrono::steady_clock::now ())
@ -49,6 +50,32 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas
block_processed.notify (result, context);
}
});
queue.max_size_query = [this] (auto const & origin) {
switch (origin.source)
{
case nano::block_source::live:
return config.max_peer_queue;
default:
return config.max_system_queue;
}
};
queue.priority_query = [this] (auto const & origin) -> size_t {
switch (origin.source)
{
case nano::block_source::live:
return config.priority_live;
case nano::block_source::bootstrap:
case nano::block_source::bootstrap_legacy:
case nano::block_source::unchecked:
return config.priority_bootstrap;
case nano::block_source::local:
return config.priority_local;
default:
return 1;
}
};
}
nano::block_processor::~block_processor ()
@ -80,39 +107,44 @@ void nano::block_processor::stop ()
}
}
std::size_t nano::block_processor::size ()
// TODO: Remove and replace all checks with calls to size (block_source)
std::size_t nano::block_processor::size () const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return blocks.size () + forced.size ();
return queue.total_size ();
}
bool nano::block_processor::full ()
std::size_t nano::block_processor::size (nano::block_source source) const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return queue.size ({ source });
}
bool nano::block_processor::full () const
{
return size () >= node.flags.block_processor_full_size;
}
bool nano::block_processor::half_full ()
bool nano::block_processor::half_full () const
{
return size () >= node.flags.block_processor_full_size / 2;
}
void nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source)
bool nano::block_processor::add (std::shared_ptr<nano::block> const & block, block_source const source, std::shared_ptr<nano::transport::channel> const & channel)
{
if (full ())
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
return;
}
if (node.network_params.work.validate_entry (*block)) // true => error
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work);
return;
return false; // Not added
}
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process);
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source));
node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})",
block->hash ().to_string (),
to_string (source),
channel ? channel->to_string () : "<unknown>"); // TODO: Lazy eval
add_impl (context{ block, source });
return add_impl (context{ block, source }, channel);
}
std::optional<nano::block_status> nano::block_processor::add_blocking (std::shared_ptr<nano::block> const & block, block_source const source)
@ -147,11 +179,27 @@ void nano::block_processor::force (std::shared_ptr<nano::block> const & block_a)
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force);
node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ());
add_impl (context{ block_a, block_source::forced });
}
bool nano::block_processor::add_impl (context ctx, std::shared_ptr<nano::transport::channel> const & channel)
{
auto const source = ctx.source;
bool added = false;
{
nano::lock_guard<nano::mutex> lock{ mutex };
forced.emplace_back (context{ block_a, block_source::forced });
nano::lock_guard<nano::mutex> guard{ mutex };
added = queue.push (std::move (ctx), { source, channel });
}
condition.notify_all ();
if (added)
{
condition.notify_all ();
}
else
{
node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill);
node.stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (source));
}
return added;
}
void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block)
@ -196,7 +244,7 @@ void nano::block_processor::run ()
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (have_blocks_ready ())
if (!queue.empty ())
{
lock.unlock ();
@ -233,47 +281,16 @@ bool nano::block_processor::should_log ()
return result;
}
bool nano::block_processor::have_blocks_ready ()
{
debug_assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty ();
}
bool nano::block_processor::have_blocks ()
{
debug_assert (!mutex.try_lock ());
return have_blocks_ready ();
}
void nano::block_processor::add_impl (context ctx)
{
release_assert (ctx.source != nano::block_source::forced);
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks.emplace_back (std::move (ctx));
}
condition.notify_all ();
}
auto nano::block_processor::next () -> context
{
debug_assert (!mutex.try_lock ());
debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next
debug_assert (!queue.empty ()); // This should be checked before calling next
if (!forced.empty ())
if (!queue.empty ())
{
auto entry = std::move (forced.front ());
release_assert (entry.source == nano::block_source::forced);
forced.pop_front ();
return entry;
}
if (!blocks.empty ())
{
auto entry = std::move (blocks.front ());
release_assert (entry.source != nano::block_source::forced);
blocks.pop_front ();
return entry;
auto [request, origin] = queue.next ();
release_assert (origin.source != nano::block_source::forced || request.source == nano::block_source::forced);
return std::move (request);
}
release_assert (false, "next() called when no blocks are ready");
@ -289,19 +306,24 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
lock_a.lock ();
queue.periodic_update ();
timer_l.start ();
// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); };
auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; };
auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; };
while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ())
{
// TODO: Cleaner periodical logging
if ((blocks.size () + forced.size () > 64) && should_log ())
if (should_log ())
{
node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ());
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.total_size (),
queue.size ({ nano::block_source::forced }));
}
auto ctx = next ();
@ -342,6 +364,7 @@ nano::block_status nano::block_processor::process_one (store::write_transaction
node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result));
node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source));
node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed,
nano::log::arg{ "result", result },
nano::log::arg{ "source", context.source },
@ -437,18 +460,12 @@ void nano::block_processor::queue_unchecked (store::write_transaction const & tr
std::unique_ptr<nano::container_info_component> nano::block_processor::collect_container_info (std::string const & name)
{
std::size_t blocks_count;
std::size_t forced_count;
{
nano::lock_guard<nano::mutex> guard{ mutex };
blocks_count = blocks.size ();
forced_count = forced.size ();
}
nano::lock_guard<nano::mutex> guard{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (forced)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", queue.total_size (), 0 }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", queue.size ({ nano::block_source::forced }), 0 }));
composite->add_component (queue.collect_container_info ("queue"));
return composite;
}
@ -463,3 +480,33 @@ nano::stat::detail nano::to_stat_detail (nano::block_source type)
debug_assert (value);
return value.value_or (nano::stat::detail{});
}
/*
* block_processor_config
*/
nano::block_processor_config::block_processor_config (const nano::network_constants & network_constants)
{
}
nano::error nano::block_processor_config::serialize (nano::tomlconfig & toml) const
{
toml.put ("max_peer_queue", max_peer_queue, "Maximum number of blocks to queue from network peers. \ntype:uint64");
toml.put ("max_system_queue", max_system_queue, "Maximum number of blocks to queue from system components (local RPC, bootstrap). \ntype:uint64");
toml.put ("priority_live", priority_live, "Priority for live network blocks. Higher priority gets processed more frequently. \ntype:uint64");
toml.put ("priority_bootstrap", priority_bootstrap, "Priority for bootstrap blocks. Higher priority gets processed more frequently. \ntype:uint64");
toml.put ("priority_local", priority_local, "Priority for local RPC blocks. Higher priority gets processed more frequently. \ntype:uint64");
return toml.get_error ();
}
nano::error nano::block_processor_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("max_peer_queue", max_peer_queue);
toml.get ("max_system_queue", max_system_queue);
toml.get ("priority_live", priority_live);
toml.get ("priority_bootstrap", priority_bootstrap);
toml.get ("priority_local", priority_local);
return toml.get_error ();
}

View file

@ -1,6 +1,7 @@
#pragma once
#include <nano/lib/logging.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/secure/common.hpp>
#include <chrono>
@ -23,7 +24,6 @@ class write_transaction;
namespace nano
{
enum class block_source
{
unknown = 0,
@ -38,6 +38,26 @@ enum class block_source
std::string_view to_string (block_source);
nano::stat::detail to_stat_detail (block_source);
class block_processor_config final
{
public:
explicit block_processor_config (nano::network_constants const &);
nano::error deserialize (nano::tomlconfig & toml);
nano::error serialize (nano::tomlconfig & toml) const;
public:
// Maximum number of blocks to queue from network peers
size_t max_peer_queue{ 128 };
// Maximum number of blocks to queue from system components (local RPC, bootstrap)
size_t max_system_queue{ 16 * 1024 };
// Higher priority gets processed more frequently
size_t priority_live{ 1 };
size_t priority_bootstrap{ 8 };
size_t priority_local{ 16 };
};
/**
* Processing blocks is a potentially long IO operation.
* This class isolates block insertion from other operations like servicing network operations
@ -72,15 +92,14 @@ public:
void start ();
void stop ();
std::size_t size ();
bool full ();
bool half_full ();
void add (std::shared_ptr<nano::block> const &, block_source = block_source::live);
std::size_t size () const;
std::size_t size (block_source) const;
bool full () const;
bool half_full () const;
bool add (std::shared_ptr<nano::block> const &, block_source = block_source::live, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
std::optional<nano::block_status> add_blocking (std::shared_ptr<nano::block> const & block, block_source);
void force (std::shared_ptr<nano::block> const &);
bool should_log ();
bool have_blocks_ready ();
bool have_blocks ();
std::unique_ptr<container_info_component> collect_container_info (std::string const & name);
@ -103,21 +122,21 @@ private:
void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &);
processed_batch_t process_batch (nano::unique_lock<nano::mutex> &);
context next ();
void add_impl (context);
bool add_impl (context, std::shared_ptr<nano::transport::channel> const & channel = nullptr);
private: // Dependencies
block_processor_config const & config;
nano::node & node;
nano::write_database_queue & write_database_queue;
private:
std::deque<context> blocks;
std::deque<context> forced;
nano::fair_queue<context, block_source> queue;
std::chrono::steady_clock::time_point next_log;
bool stopped{ false };
nano::condition_variable condition;
nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) };
std::thread thread;
};
}

View file

@ -225,9 +225,9 @@ void nano::bootstrap_attempt_legacy::run ()
// TODO: This check / wait is a heuristic and should be improved.
auto wait_start = std::chrono::steady_clock::now ();
while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 }))
while (!stopped && node->block_processor.size (nano::block_source::bootstrap_legacy) != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 }))
{
condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; });
condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size (nano::block_source::bootstrap_legacy) == 0; });
}
if (start_account.number () != std::numeric_limits<nano::uint256_t>::max ())

View file

@ -177,7 +177,7 @@ void nano::bootstrap_ascending::service::inspect (store::transaction const & tx,
void nano::bootstrap_ascending::service::wait_blockprocessor ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped && block_processor.size () > config.bootstrap_ascending.block_wait_count)
while (!stopped && block_processor.size (nano::block_source::bootstrap) > config.bootstrap_ascending.block_wait_count)
{
condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions
}

357
nano/node/fair_queue.hpp Normal file
View file

@ -0,0 +1,357 @@
#pragma once
#include <nano/lib/utility.hpp>
#include <nano/node/transport/channel.hpp>
#include <algorithm>
#include <chrono>
#include <deque>
#include <functional>
#include <memory>
#include <numeric>
#include <tuple>
#include <utility>
namespace nano
{
template <typename Request, typename Source>
class fair_queue final
{
public:
struct origin
{
Source source;
std::shared_ptr<nano::transport::channel> channel;
origin (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source },
channel{ channel }
{
}
};
private:
/**
* Holds user supplied source type(s) and an optional channel. This is used to uniquely identify and categorize the source of a request.
*/
struct origin_entry
{
Source source;
// Optional is needed to distinguish between a source with no associated channel and a source with an expired channel
// TODO: Store channel as shared_ptr after networking fixes are done
std::optional<std::weak_ptr<nano::transport::channel>> maybe_channel;
origin_entry (Source source, std::shared_ptr<nano::transport::channel> channel = nullptr) :
source{ source }
{
if (channel)
{
maybe_channel = std::weak_ptr{ channel };
}
}
origin_entry (origin const & origin) :
origin_entry (origin.source, origin.channel)
{
}
bool alive () const
{
if (maybe_channel)
{
if (auto channel_l = maybe_channel->lock ())
{
return channel_l->alive ();
}
else
{
return false;
}
}
else
{
// Some sources (eg. local RPC) don't have an associated channel, never remove their queue
return true;
}
}
// TODO: Store channel as shared_ptr to avoid this mess
auto operator<=> (origin_entry const & other) const
{
// First compare source
if (auto cmp = source <=> other.source; cmp != 0)
return cmp;
if (maybe_channel && other.maybe_channel)
{
// Then compare channels by ownership, not by the channel's value or state
std::owner_less<std::weak_ptr<nano::transport::channel>> less;
if (less (*maybe_channel, *other.maybe_channel))
return std::strong_ordering::less;
if (less (*other.maybe_channel, *maybe_channel))
return std::strong_ordering::greater;
return std::strong_ordering::equivalent;
}
else
{
if (maybe_channel && !other.maybe_channel)
{
return std::strong_ordering::greater;
}
if (!maybe_channel && other.maybe_channel)
{
return std::strong_ordering::less;
}
return std::strong_ordering::equivalent;
}
}
operator origin () const
{
return { source, maybe_channel ? maybe_channel->lock () : nullptr };
}
};
struct entry
{
using queue_t = std::deque<Request>;
queue_t requests;
size_t priority;
size_t max_size;
entry (size_t max_size, size_t priority) :
priority{ priority },
max_size{ max_size }
{
}
Request pop ()
{
release_assert (!requests.empty ());
auto request = std::move (requests.front ());
requests.pop_front ();
return request;
}
bool push (Request request)
{
if (requests.size () < max_size)
{
requests.push_back (std::move (request));
return true; // Added
}
return false; // Dropped
}
bool empty () const
{
return requests.empty ();
}
size_t size () const
{
return requests.size ();
}
};
public:
using origin_type = origin;
using value_type = std::pair<Request, origin_type>;
public:
size_t size (origin_type source) const
{
auto it = queues.find (source);
return it == queues.end () ? 0 : it->second.size ();
}
size_t max_size (origin_type source) const
{
auto it = queues.find (source);
return it == queues.end () ? 0 : it->second.max_size;
}
size_t priority (origin_type source) const
{
auto it = queues.find (source);
return it == queues.end () ? 0 : it->second.priority;
}
size_t total_size () const
{
return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) {
return total + queue.second.size ();
});
};
bool empty () const
{
return std::all_of (queues.begin (), queues.end (), [] (auto const & queue) {
return queue.second.empty ();
});
}
size_t queues_size () const
{
return queues.size ();
}
void clear ()
{
queues.clear ();
}
/**
* Should be called periodically to clean up stale channels and update queue priorities and max sizes
*/
bool periodic_update (std::chrono::milliseconds interval = std::chrono::milliseconds{ 1000 * 30 })
{
if (elapsed (last_update, interval))
{
last_update = std::chrono::steady_clock::now ();
cleanup ();
update ();
return true; // Updated
}
return false; // Not updated
}
/**
* Push a request to the appropriate queue based on the source
* Request will be dropped if the queue is full
* @return true if added, false if dropped
*/
bool push (Request request, origin_type source)
{
auto it = queues.find (source);
// Create a new queue if it doesn't exist
if (it == queues.end ())
{
auto max_size = max_size_query (source);
auto priority = priority_query (source);
// It's safe to not invalidate current iterator, since std::map container guarantees that iterators are not invalidated by insert operations
it = queues.emplace (source, entry{ max_size, priority }).first;
}
release_assert (it != queues.end ());
auto & queue = it->second;
return queue.push (std::move (request)); // True if added, false if dropped
}
public:
using max_size_query_t = std::function<size_t (origin_type const &)>;
using priority_query_t = std::function<size_t (origin_type const &)>;
max_size_query_t max_size_query{ [] (auto const & origin) { debug_assert (false, "max_size_query callback empty"); return 0; } };
priority_query_t priority_query{ [] (auto const & origin) { debug_assert (false, "priority_query callback empty"); return 0; } };
public:
value_type next ()
{
debug_assert (!empty ()); // Should be checked before calling next
auto should_seek = [&, this] () {
if (iterator == queues.end ())
{
return true;
}
auto & queue = iterator->second;
if (queue.empty ())
{
return true;
}
// Allow up to `queue.priority` requests to be processed before moving to the next queue
if (counter >= queue.priority)
{
return true;
}
return false;
};
if (should_seek ())
{
seek_next ();
}
release_assert (iterator != queues.end ());
auto & source = iterator->first;
auto & queue = iterator->second;
++counter;
return { queue.pop (), source };
}
std::deque<value_type> next_batch (size_t max_count)
{
// TODO: Naive implementation, could be optimized
std::deque<value_type> result;
while (!empty () && result.size () < max_count)
{
result.emplace_back (next ());
}
return result;
}
private:
void seek_next ()
{
counter = 0;
do
{
if (iterator != queues.end ())
{
++iterator;
}
if (iterator == queues.end ())
{
iterator = queues.begin ();
}
release_assert (iterator != queues.end ());
} while (iterator->second.empty ());
}
void cleanup ()
{
// Invalidate the current iterator
iterator = queues.end ();
erase_if (queues, [] (auto const & entry) {
return !entry.first.alive ();
});
}
void update ()
{
for (auto & [source, queue] : queues)
{
queue.max_size = max_size_query (source);
queue.priority = priority_query (source);
}
}
private:
std::map<origin_entry, entry> queues;
std::map<origin_entry, entry>::iterator iterator{ queues.end () };
size_t counter{ 0 };
std::chrono::steady_clock::time_point last_update{};
public:
std::unique_ptr<container_info_component> collect_container_info (std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queues", queues.size (), sizeof (typename decltype (queues)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) }));
return composite;
}
};
}

View file

@ -336,15 +336,12 @@ public:
}
}
void publish (nano::publish const & message_a) override
void publish (nano::publish const & message) override
{
if (!node.block_processor.full ())
bool added = node.block_processor.add (message.block, nano::block_source::live, channel);
if (!added)
{
node.process_active (message_a.block);
}
else
{
node.network.publish_filter.clear (message_a.digest);
node.network.publish_filter.clear (message.digest);
node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in);
}
}

View file

@ -33,7 +33,8 @@ nano::node_config::node_config (const std::optional<uint16_t> & peering_port_a,
websocket_config{ network_params.network },
ipc_config{ network_params.network },
external_address{ boost::asio::ip::address_v6{}.to_string () },
rep_crawler{ network_params.network }
rep_crawler{ network_params.network },
block_processor{ network_params.network }
{
if (peering_port == 0)
{
@ -212,6 +213,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
rep_crawler.serialize (rep_crawler_l);
toml.put_child ("rep_crawler", rep_crawler_l);
nano::tomlconfig block_processor_l;
block_processor.serialize (block_processor_l);
toml.put_child ("block_processor", block_processor_l);
return toml.get_error ();
}
@ -287,6 +292,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml)
rep_crawler.deserialize (config_l);
}
if (toml.has_key ("block_processor"))
{
auto config_l = toml.get_required_child ("block_processor");
block_processor.deserialize (config_l);
}
if (toml.has_key ("work_peers"))
{
work_peers.clear ();

View file

@ -8,6 +8,7 @@
#include <nano/lib/numbers.hpp>
#include <nano/lib/rocksdbconfig.hpp>
#include <nano/lib/stats.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap_config.hpp>
#include <nano/node/ipc/ipc_config.hpp>
#include <nano/node/repcrawler.hpp>
@ -136,6 +137,7 @@ public:
unsigned backlog_scan_frequency{ 10 };
nano::vote_cache_config vote_cache;
nano::rep_crawler_config rep_crawler;
nano::block_processor_config block_processor;
public:
std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const;

View file

@ -214,7 +214,7 @@ std::vector<nano::block_hash> nano::test::blocks_to_hashes (std::vector<std::sha
return hashes;
}
std::shared_ptr<nano::transport::channel> nano::test::fake_channel (nano::node & node, nano::account node_id)
std::shared_ptr<nano::transport::fake::channel> nano::test::fake_channel (nano::node & node, nano::account node_id)
{
auto channel = std::make_shared<nano::transport::fake::channel> (node);
if (!node_id.is_zero ())

View file

@ -3,6 +3,7 @@
#include <nano/lib/locks.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/transport/channel.hpp>
#include <nano/node/transport/fake.hpp>
#include <nano/secure/account_info.hpp>
#include <gtest/gtest.h>
@ -382,7 +383,7 @@ namespace test
/*
* Creates a new fake channel associated with `node`
*/
std::shared_ptr<nano::transport::channel> fake_channel (nano::node & node, nano::account node_id = { 0 });
std::shared_ptr<nano::transport::fake::channel> fake_channel (nano::node & node, nano::account node_id = { 0 });
/*
* Start an election on system system_a, node node_a and hash hash_a by reading the block
* out of the ledger and adding it to the manual election scheduler queue.