Periodically log queue sizes for node components (#4890)

This commit is contained in:
Piotr Wójcik 2025-04-20 22:49:51 +02:00 committed by GitHub
commit 8c7cb21b91
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 106 additions and 19 deletions

View file

@ -74,8 +74,10 @@ enum class type
txn_tracker,
gap_cache,
vote_processor,
vote_cache_processor,
election_scheduler,
vote_generator,
vote_generator_final,
rep_tiers,
syn_cookies,
thread_runner,
@ -88,6 +90,7 @@ enum class type
cementing_set,
bounded_backlog,
request_aggregator,
vote_rebroadcaster,
// bootstrap
bulk_pull_client,

View file

@ -219,5 +219,5 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag
logger.critical (nano::log::type::daemon, "Error while running node: {}", e.what ());
}
logger.info (nano::log::type::daemon, "Daemon stopped");
logger.info (nano::log::type::daemon, "Stopped");
}

View file

@ -78,7 +78,7 @@ void run (std::filesystem::path const & data_path, std::vector<std::string> cons
logger.critical (nano::log::type::daemon_rpc, "Error deserializing config: {}", error.get_message ());
}
logger.info (nano::log::type::daemon_rpc, "Daemon stopped (RPC)");
logger.info (nano::log::type::daemon_rpc, "Stopped");
}
}

View file

@ -232,7 +232,7 @@ void nano::block_processor::run ()
{
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::block_processor, "{} blocks (+ {} forced) in processing queue",
logger.info (nano::log::type::block_processor, "{} blocks ({} forced) in processing queue",
queue.size (),
queue.size ({ nano::block_source::forced }));
}

View file

@ -129,6 +129,13 @@ void nano::cementing_set::run ()
if (!set.empty ())
{
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::cementing_set, "{} blocks in cementing set, {} deferred",
set.size (),
deferred.size ());
}
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();

View file

@ -1,5 +1,6 @@
#pragma once
#include <nano/lib/interval.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/numbers_templ.hpp>
#include <nano/lib/observer_set.hpp>
@ -127,5 +128,7 @@ private:
std::thread thread;
nano::thread_pool workers;
nano::interval log_interval;
};
}

View file

@ -134,6 +134,11 @@ void nano::local_block_broadcaster::run ()
debug_assert (lock.owns_lock ());
}
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::local_block_broadcaster, "{} blocks in broadcasting set", local_blocks.size ());
}
run_broadcasts (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();

View file

@ -122,5 +122,7 @@ private:
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
nano::interval log_interval;
};
}

View file

@ -115,6 +115,11 @@ void nano::message_processor::run ()
if (!queue.empty ())
{
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::message_processor, "{} messages in processing queue", queue.size ());
}
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();

View file

@ -1,5 +1,6 @@
#pragma once
#include <nano/lib/interval.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/fair_queue.hpp>
@ -56,5 +57,7 @@ private:
mutable nano::mutex mutex;
nano::condition_variable condition;
std::vector<std::thread> threads;
nano::interval log_interval;
};
}

View file

@ -609,7 +609,7 @@ void nano::node::stop ()
return;
}
logger.info (nano::log::type::node, "Node stopping...");
logger.info (nano::log::type::node, "Stopping...");
tcp_listener.stop ();
online_reps.stop ();

View file

@ -123,6 +123,11 @@ void nano::request_aggregator::run ()
if (!queue.empty ())
{
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::request_aggregator, "{} requests in processing queue", queue.size ());
}
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();

View file

@ -1,5 +1,6 @@
#pragma once
#include <nano/lib/interval.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/threading.hpp>
@ -101,5 +102,7 @@ private:
nano::condition_variable condition;
mutable nano::mutex mutex{ mutex_identifier (mutexes::request_aggregator) };
std::vector<std::thread> threads;
nano::interval log_interval;
};
}

View file

@ -65,7 +65,7 @@ bool nano::vote_generator::should_vote (transaction_variant_t const & transactio
should_vote = block != nullptr && ledger.dependents_confirmed (transaction, *block);
}
logger.trace (nano::log::type::vote_generator, nano::log::detail::should_vote,
logger.trace (log_type (), nano::log::detail::should_vote,
nano::log::arg{ "should_vote", should_vote },
nano::log::arg{ "block", block },
nano::log::arg{ "is_final", is_final });
@ -292,6 +292,7 @@ void nano::vote_generator::run ()
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
// Wait for at most vote_generator_delay in case no further notification is received
condition.wait_for (lock, config.vote_generator_delay, [this] () {
return stopped || broadcast_predicate () || !requests.empty ();
});
@ -301,17 +302,27 @@ void nano::vote_generator::run ()
return;
}
if (broadcast_predicate ())
if (broadcast_predicate () || !requests.empty ())
{
broadcast (lock);
next_broadcast = std::chrono::steady_clock::now () + config.vote_generator_delay;
}
stats.inc (stat_type (), nano::stat::detail::loop);
if (!requests.empty ())
{
auto request (requests.front ());
requests.pop_front ();
reply (lock, std::move (request));
if (log_interval.elapse (15s))
{
logger.info (log_type (), "{} candidates, {} requests in processing queue", candidates.size (), requests.size ());
}
if (broadcast_predicate ())
{
broadcast (lock);
next_broadcast = std::chrono::steady_clock::now () + config.vote_generator_delay;
}
if (!requests.empty ())
{
auto request (requests.front ());
requests.pop_front ();
reply (lock, std::move (request));
}
}
}
}
@ -345,4 +356,9 @@ nano::container_info nano::vote_generator::container_info () const
nano::stat::type nano::vote_generator::stat_type () const
{
return is_final ? nano::stat::type::vote_generator_final : nano::stat::type::vote_generator;
}
nano::log::type nano::vote_generator::log_type () const
{
return is_final ? nano::log::type::vote_generator_final : nano::log::type::vote_generator;
}

View file

@ -1,5 +1,6 @@
#pragma once
#include <nano/lib/interval.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/logging.hpp>
#include <nano/lib/numbers.hpp>
@ -59,6 +60,7 @@ private:
bool broadcast_predicate () const;
nano::stat::type stat_type () const;
nano::log::type log_type () const;
private: // Dependencies
nano::node_config const & config;
@ -84,5 +86,7 @@ private:
std::thread thread;
std::shared_ptr<nano::transport::channel> inproc_channel;
nano::processing_queue<queue_entry_t> vote_generation_queue;
nano::interval log_interval;
};
}

View file

@ -136,6 +136,15 @@ void nano::vote_processor::run ()
if (!queue.empty ())
{
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::vote_processor, "{} votes (tier 3: {}, tier 2: {}, tier 1: {}) in processing queue",
queue.size (),
queue.size ({ nano::rep_tier::tier_3 }),
queue.size ({ nano::rep_tier::tier_2 }),
queue.size ({ nano::rep_tier::tier_1 }));
}
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();
@ -314,6 +323,11 @@ void nano::vote_cache_processor::run ()
if (!triggered.empty ())
{
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::vote_cache_processor, "{} hashes in processing queue", triggered.size ());
}
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();

View file

@ -1,5 +1,6 @@
#pragma once
#include <nano/lib/interval.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
@ -81,6 +82,8 @@ private:
nano::condition_variable condition;
mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) };
std::vector<std::thread> threads;
nano::interval log_interval;
};
class vote_cache_processor final
@ -118,5 +121,7 @@ private:
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
nano::interval log_interval;
};
}

View file

@ -206,6 +206,15 @@ void nano::vote_rebroadcaster::run ()
if (!queue.empty ())
{
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::vote_rebroadcaster, "{} votes (tier 3: {}, tier 2: {}, tier 1: {}) in rebroadcast queue",
queue.size (),
queue.size ({ nano::rep_tier::tier_3 }),
queue.size ({ nano::rep_tier::tier_2 }),
queue.size ({ nano::rep_tier::tier_1 }));
}
auto [vote, tier] = next ();
lock.unlock ();

View file

@ -1,5 +1,6 @@
#pragma once
#include <nano/lib/interval.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/node/fair_queue.hpp>
@ -170,5 +171,7 @@ private:
std::condition_variable condition;
mutable std::mutex mutex;
std::thread thread;
nano::interval log_interval;
};
}

View file

@ -799,11 +799,11 @@ void nano::ledger::initialize (nano::generate_cache_flags const & generate_cache
cache.pruned_count = store.pruned.count (transaction);
}
logger.info (nano::log::type::ledger, "Block count: {:>10}", cache.block_count.load ());
logger.info (nano::log::type::ledger, "Cemented count: {:>10}", cache.cemented_count.load ());
logger.info (nano::log::type::ledger, "Account count: {:>10}", cache.account_count.load ());
logger.info (nano::log::type::ledger, "Pruned count: {:>10}", cache.pruned_count.load ());
logger.info (nano::log::type::ledger, "Representative count: {}", cache.rep_weights.size ());
logger.info (nano::log::type::ledger, "Block count: {:>11}", cache.block_count.load ());
logger.info (nano::log::type::ledger, "Cemented count: {:>11}", cache.cemented_count.load ());
logger.info (nano::log::type::ledger, "Account count: {:>11}", cache.account_count.load ());
logger.info (nano::log::type::ledger, "Pruned count: {:>11}", cache.pruned_count.load ());
logger.info (nano::log::type::ledger, "Representative count: {:>5}", cache.rep_weights.size ());
}
bool nano::ledger::unconfirmed_exists (secure::transaction const & transaction, nano::block_hash const & hash)