Stats overhaul (#4583)

* Introduce `nano::elapse ()` helper

* Move stats test to a dedicated file

* Missing override & smaller improvements

* nano::stats::dump

* Move implementation to .cpp file

* Index stats by a dedicated struct

* Remove stat observers

* Overhaul

* Config

* Use dedicated thread

* Separate stat sinks

* Samples writer

* Fix for max size

* Simple sampler key

* Expected min max

* Fix tests

* Cleanup

* Test for samples rpc

* Implement sampling for node components

* TODO

* Remove special semantics of `stat::detail::all`

* Guard against invalid values

* Thread loop interval

* More tests

* Flag to aggregate `stat::detail::all`

---------

Co-authored-by: Colin LeMahieu <clemahieu@gmail.com>
This commit is contained in:
Piotr Wójcik 2024-04-30 19:39:28 +02:00 committed by GitHub
commit 04de36cb0c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
28 changed files with 858 additions and 910 deletions

View file

@ -46,6 +46,7 @@ add_executable(
peer_container.cpp
rep_weight_store.cpp
scheduler_buckets.cpp
stats.cpp
request_aggregator.cpp
signal_manager.cpp
socket.cpp

View file

@ -103,7 +103,7 @@ TEST (confirmation_callback, observer_callbacks)
node->confirming_set.add (send1->hash ());
// Callback is performed for all blocks that are confirmed
ASSERT_TIMELY_EQ (5s, 2, node->ledger.stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::all, nano::stat::dir::out));
ASSERT_TIMELY_EQ (5s, 2, node->ledger.stats.count (nano::stat::type::confirmation_observer, nano::stat::dir::out));
ASSERT_EQ (2, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (3, node->ledger.cemented_count ());

View file

@ -1694,56 +1694,6 @@ TEST (node, bootstrap_connection_scaling)
ASSERT_EQ (1, node1.bootstrap_initiator.connections->target_connections (50000, 1));
}
// Test stat counting at both type and detail levels
TEST (node, stat_counting)
{
nano::test::system system (1);
auto & node1 (*system.nodes[0]);
node1.stats.add (nano::stat::type::ledger, nano::stat::dir::in, 1);
node1.stats.add (nano::stat::type::ledger, nano::stat::dir::in, 5);
node1.stats.inc (nano::stat::type::ledger, nano::stat::dir::in);
node1.stats.inc (nano::stat::type::ledger, nano::stat::detail::send, nano::stat::dir::in);
node1.stats.inc (nano::stat::type::ledger, nano::stat::detail::send, nano::stat::dir::in);
node1.stats.inc (nano::stat::type::ledger, nano::stat::detail::receive, nano::stat::dir::in);
ASSERT_EQ (10, node1.stats.count (nano::stat::type::ledger, nano::stat::dir::in));
ASSERT_EQ (2, node1.stats.count (nano::stat::type::ledger, nano::stat::detail::send, nano::stat::dir::in));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::ledger, nano::stat::detail::receive, nano::stat::dir::in));
node1.stats.add (nano::stat::type::ledger, nano::stat::dir::in, 0);
ASSERT_EQ (10, node1.stats.count (nano::stat::type::ledger, nano::stat::dir::in));
}
TEST (node, stat_histogram)
{
nano::test::system system (1);
auto & node1 (*system.nodes[0]);
// Specific bins
node1.stats.define_histogram (nano::stat::type::vote, nano::stat::detail::confirm_req, nano::stat::dir::in, { 1, 6, 10, 16 });
node1.stats.update_histogram (nano::stat::type::vote, nano::stat::detail::confirm_req, nano::stat::dir::in, 1, 50);
auto histogram_req (node1.stats.get_histogram (nano::stat::type::vote, nano::stat::detail::confirm_req, nano::stat::dir::in));
ASSERT_EQ (histogram_req->get_bins ()[0].value, 50);
// Uniform distribution (12 bins, width 1); also test clamping 100 to the last bin
node1.stats.define_histogram (nano::stat::type::vote, nano::stat::detail::confirm_ack, nano::stat::dir::in, { 1, 13 }, 12);
node1.stats.update_histogram (nano::stat::type::vote, nano::stat::detail::confirm_ack, nano::stat::dir::in, 1);
node1.stats.update_histogram (nano::stat::type::vote, nano::stat::detail::confirm_ack, nano::stat::dir::in, 8, 10);
node1.stats.update_histogram (nano::stat::type::vote, nano::stat::detail::confirm_ack, nano::stat::dir::in, 100);
auto histogram_ack (node1.stats.get_histogram (nano::stat::type::vote, nano::stat::detail::confirm_ack, nano::stat::dir::in));
ASSERT_EQ (histogram_ack->get_bins ()[0].value, 1);
ASSERT_EQ (histogram_ack->get_bins ()[7].value, 10);
ASSERT_EQ (histogram_ack->get_bins ()[11].value, 1);
// Uniform distribution (2 bins, width 5); add 1 to each bin
node1.stats.define_histogram (nano::stat::type::vote, nano::stat::detail::confirm_ack, nano::stat::dir::out, { 1, 11 }, 2);
node1.stats.update_histogram (nano::stat::type::vote, nano::stat::detail::confirm_ack, nano::stat::dir::out, 1, 1);
node1.stats.update_histogram (nano::stat::type::vote, nano::stat::detail::confirm_ack, nano::stat::dir::out, 6, 1);
auto histogram_ack_out (node1.stats.get_histogram (nano::stat::type::vote, nano::stat::detail::confirm_ack, nano::stat::dir::out));
ASSERT_EQ (histogram_ack_out->get_bins ()[0].value, 1);
ASSERT_EQ (histogram_ack_out->get_bins ()[1].value, 1);
}
TEST (node, online_reps)
{
nano::test::system system (1);
@ -2469,7 +2419,7 @@ TEST (node, DISABLED_fork_invalid_block_signature)
// Send the vote with the corrupt block signature
node2.network.flood_vote (vote_corrupt, 1.0f);
// Wait for the rollback
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::rollback, nano::stat::detail::all));
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::rollback));
// Send the vote with the correct block
node2.network.flood_vote (vote, 1.0f);
ASSERT_TIMELY (10s, !node1.block (send1->hash ()));

View file

@ -10,14 +10,14 @@ using namespace std::chrono_literals;
TEST (processing_queue, construction)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 8 * 1024, 1024 };
nano::processing_queue<int> queue{ system.stats, nano::stat::type::test, {}, 4, 8 * 1024, 1024 };
ASSERT_EQ (queue.size (), 0);
}
TEST (processing_queue, process_one)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 8 * 1024, 1024 };
nano::processing_queue<int> queue{ system.stats, nano::stat::type::test, {}, 4, 8 * 1024, 1024 };
std::atomic<std::size_t> processed{ 0 };
queue.process_batch = [&] (auto & batch) {
@ -35,7 +35,7 @@ TEST (processing_queue, process_one)
TEST (processing_queue, process_many)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 8 * 1024, 1024 };
nano::processing_queue<int> queue{ system.stats, nano::stat::type::test, {}, 4, 8 * 1024, 1024 };
std::atomic<std::size_t> processed{ 0 };
queue.process_batch = [&] (auto & batch) {
@ -57,7 +57,7 @@ TEST (processing_queue, process_many)
TEST (processing_queue, max_queue_size)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 1024, 128 };
nano::processing_queue<int> queue{ system.stats, nano::stat::type::test, {}, 4, 1024, 128 };
const int count = 2 * 1024; // Double the max queue size
for (int n = 0; n < count; ++n)
@ -71,7 +71,7 @@ TEST (processing_queue, max_queue_size)
TEST (processing_queue, max_batch_size)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 4, 1024, 128 };
nano::processing_queue<int> queue{ system.stats, nano::stat::type::test, {}, 4, 1024, 128 };
// Fill queue before starting processing threads
const int count = 1024;
@ -97,7 +97,7 @@ TEST (processing_queue, max_batch_size)
TEST (processing_queue, parallel)
{
nano::test::system system{};
nano::processing_queue<int> queue{ system.stats, {}, {}, 16, 1024, 1 };
nano::processing_queue<int> queue{ system.stats, nano::stat::type::test, {}, 16, 1024, 1 };
std::atomic<std::size_t> processed{ 0 };
queue.process_batch = [&] (auto & batch) {

79
nano/core_test/stats.cpp Normal file
View file

@ -0,0 +1,79 @@
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
#include <ostream>
// Test stat counting at both type and detail levels
TEST (stats, counters)
{
nano::test::system system;
auto & node = *system.add_node ();
node.stats.add (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in, 1);
node.stats.add (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in, 5);
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in);
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::send, nano::stat::dir::in);
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::send, nano::stat::dir::in);
node.stats.inc (nano::stat::type::ledger, nano::stat::detail::receive, nano::stat::dir::in);
ASSERT_EQ (10, node.stats.count (nano::stat::type::ledger, nano::stat::dir::in));
ASSERT_EQ (2, node.stats.count (nano::stat::type::ledger, nano::stat::detail::send, nano::stat::dir::in));
ASSERT_EQ (1, node.stats.count (nano::stat::type::ledger, nano::stat::detail::receive, nano::stat::dir::in));
node.stats.add (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in, 0);
ASSERT_EQ (10, node.stats.count (nano::stat::type::ledger, nano::stat::dir::in));
}
TEST (stats, counters_aggregate_all)
{
nano::test::system system;
auto & node = *system.add_node ();
node.stats.add (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in, 1, true);
ASSERT_EQ (1, node.stats.count (nano::stat::type::ledger, nano::stat::dir::in));
ASSERT_EQ (1, node.stats.count (nano::stat::type::ledger, nano::stat::detail::all, nano::stat::dir::in));
ASSERT_EQ (1, node.stats.count (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in));
node.stats.add (nano::stat::type::ledger, nano::stat::detail::activate, nano::stat::dir::in, 5, true);
ASSERT_EQ (6, node.stats.count (nano::stat::type::ledger, nano::stat::dir::in));
ASSERT_EQ (6, node.stats.count (nano::stat::type::ledger, nano::stat::detail::all, nano::stat::dir::in));
ASSERT_EQ (1, node.stats.count (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in));
}
TEST (stats, samples)
{
nano::test::system system;
auto & node = *system.add_node ();
node.stats.sample (nano::stat::sample::active_election_duration, { 1, 10 }, 5);
node.stats.sample (nano::stat::sample::active_election_duration, { 1, 10 }, 5);
node.stats.sample (nano::stat::sample::active_election_duration, { 1, 10 }, 11);
node.stats.sample (nano::stat::sample::active_election_duration, { 1, 10 }, 37);
node.stats.sample (nano::stat::sample::bootstrap_tag_duration, { 1, 10 }, 2137);
auto samples1 = node.stats.samples (nano::stat::sample::active_election_duration);
ASSERT_EQ (4, samples1.size ());
ASSERT_EQ (5, samples1[0]);
ASSERT_EQ (5, samples1[1]);
ASSERT_EQ (11, samples1[2]);
ASSERT_EQ (37, samples1[3]);
auto samples2 = node.stats.samples (nano::stat::sample::active_election_duration);
ASSERT_EQ (0, samples2.size ());
node.stats.sample (nano::stat::sample::active_election_duration, { 1, 10 }, 3);
auto samples3 = node.stats.samples (nano::stat::sample::active_election_duration);
ASSERT_EQ (1, samples3.size ());
ASSERT_EQ (3, samples3[0]);
auto samples4 = node.stats.samples (nano::stat::sample::bootstrap_tag_duration);
ASSERT_EQ (1, samples4.size ());
ASSERT_EQ (2137, samples4[0]);
}

View file

@ -227,12 +227,10 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.min_read_txn_time, defaults.node.diagnostics_config.txn_tracking.min_read_txn_time);
ASSERT_EQ (conf.node.diagnostics_config.txn_tracking.min_write_txn_time, defaults.node.diagnostics_config.txn_tracking.min_write_txn_time);
ASSERT_EQ (conf.node.stats_config.sampling_enabled, defaults.node.stats_config.sampling_enabled);
ASSERT_EQ (conf.node.stats_config.interval, defaults.node.stats_config.interval);
ASSERT_EQ (conf.node.stats_config.capacity, defaults.node.stats_config.capacity);
ASSERT_EQ (conf.node.stats_config.max_samples, defaults.node.stats_config.max_samples);
ASSERT_EQ (conf.node.stats_config.log_rotation_count, defaults.node.stats_config.log_rotation_count);
ASSERT_EQ (conf.node.stats_config.log_interval_samples, defaults.node.stats_config.log_interval_samples);
ASSERT_EQ (conf.node.stats_config.log_interval_counters, defaults.node.stats_config.log_interval_counters);
ASSERT_EQ (conf.node.stats_config.log_samples_interval, defaults.node.stats_config.log_samples_interval);
ASSERT_EQ (conf.node.stats_config.log_counters_interval, defaults.node.stats_config.log_counters_interval);
ASSERT_EQ (conf.node.stats_config.log_headers, defaults.node.stats_config.log_headers);
ASSERT_EQ (conf.node.stats_config.log_counters_filename, defaults.node.stats_config.log_counters_filename);
ASSERT_EQ (conf.node.stats_config.log_samples_filename, defaults.node.stats_config.log_samples_filename);
@ -514,6 +512,9 @@ TEST (toml, daemon_config_deserialize_no_defaults)
rep_crawler = true
work_generation_time = false
[node.statistics]
max_samples = 999
[node.statistics.log]
filename_counters = "devcounters.stat"
filename_samples = "devsamples.stat"
@ -522,11 +523,6 @@ TEST (toml, daemon_config_deserialize_no_defaults)
interval_samples = 999
rotation_count = 999
[node.statistics.sampling]
capacity = 999
enable = true
interval = 999
[node.websocket]
address = "0:0:0:0:0:ffff:7f01:101"
enable = true
@ -683,12 +679,10 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.diagnostics_config.txn_tracking.min_read_txn_time, defaults.node.diagnostics_config.txn_tracking.min_read_txn_time);
ASSERT_NE (conf.node.diagnostics_config.txn_tracking.min_write_txn_time, defaults.node.diagnostics_config.txn_tracking.min_write_txn_time);
ASSERT_NE (conf.node.stats_config.sampling_enabled, defaults.node.stats_config.sampling_enabled);
ASSERT_NE (conf.node.stats_config.interval, defaults.node.stats_config.interval);
ASSERT_NE (conf.node.stats_config.capacity, defaults.node.stats_config.capacity);
ASSERT_NE (conf.node.stats_config.max_samples, defaults.node.stats_config.max_samples);
ASSERT_NE (conf.node.stats_config.log_rotation_count, defaults.node.stats_config.log_rotation_count);
ASSERT_NE (conf.node.stats_config.log_interval_samples, defaults.node.stats_config.log_interval_samples);
ASSERT_NE (conf.node.stats_config.log_interval_counters, defaults.node.stats_config.log_interval_counters);
ASSERT_NE (conf.node.stats_config.log_samples_interval, defaults.node.stats_config.log_samples_interval);
ASSERT_NE (conf.node.stats_config.log_counters_interval, defaults.node.stats_config.log_counters_interval);
ASSERT_NE (conf.node.stats_config.log_headers, defaults.node.stats_config.log_headers);
ASSERT_NE (conf.node.stats_config.log_counters_filename, defaults.node.stats_config.log_counters_filename);
ASSERT_NE (conf.node.stats_config.log_samples_filename, defaults.node.stats_config.log_samples_filename);

View file

@ -85,6 +85,7 @@ add_library(
stats.cpp
stats_enums.hpp
stats_enums.cpp
stats_sinks.hpp
stream.hpp
thread_pool.hpp
thread_pool.cpp

View file

@ -1,6 +1,8 @@
#include <nano/lib/jsonconfig.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/stats_sinks.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/lib/tomlconfig.hpp>
#include <boost/format.hpp>
@ -10,296 +12,198 @@
#include <fstream>
#include <sstream>
nano::error nano::stats_config::deserialize_toml (nano::tomlconfig & toml)
{
auto sampling_l (toml.get_optional_child ("sampling"));
if (sampling_l)
{
sampling_l->get<bool> ("enable", sampling_enabled);
sampling_l->get<size_t> ("capacity", capacity);
sampling_l->get<size_t> ("interval", interval);
}
using namespace std::chrono_literals;
auto log_l (toml.get_optional_child ("log"));
if (log_l)
{
log_l->get<bool> ("headers", log_headers);
log_l->get<size_t> ("interval_counters", log_interval_counters);
log_l->get<size_t> ("interval_samples", log_interval_samples);
log_l->get<size_t> ("rotation_count", log_rotation_count);
log_l->get<std::string> ("filename_counters", log_counters_filename);
log_l->get<std::string> ("filename_samples", log_samples_filename);
// Don't allow specifying the same file name for counter and samples logs
if (log_counters_filename == log_samples_filename)
{
toml.get_error ().set ("The statistics counter and samples config values must be different");
}
}
return toml.get_error ();
}
nano::error nano::stats_config::serialize_toml (nano::tomlconfig & toml) const
{
nano::tomlconfig sampling_l;
sampling_l.put ("enable", sampling_enabled, "Enable or disable sampling.\ntype:bool");
sampling_l.put ("capacity", capacity, "How many sample intervals to keep in the ring buffer.\ntype:uint64");
sampling_l.put ("interval", interval, "Sample interval.\ntype:milliseconds");
toml.put_child ("sampling", sampling_l);
nano::tomlconfig log_l;
log_l.put ("headers", log_headers, "If true, write headers on each counter or samples writeout.\nThe header contains log type and the current wall time.\ntype:bool");
log_l.put ("interval_counters", log_interval_counters, "How often to log counters. 0 disables logging.\ntype:milliseconds");
log_l.put ("interval_samples", log_interval_samples, "How often to log samples. 0 disables logging.\ntype:milliseconds");
log_l.put ("rotation_count", log_rotation_count, "Maximum number of log outputs before rotating the file.\ntype:uint64");
log_l.put ("filename_counters", log_counters_filename, "Log file name for counters.\ntype:string");
log_l.put ("filename_samples", log_samples_filename, "Log file name for samples.\ntype:string");
toml.put_child ("log", log_l);
return toml.get_error ();
}
/*
* stat_log_sink
*/
std::string nano::stat_log_sink::tm_to_string (tm & tm)
{
return (boost::format ("%04d.%02d.%02d %02d:%02d:%02d") % (1900 + tm.tm_year) % (tm.tm_mon + 1) % tm.tm_mday % tm.tm_hour % tm.tm_min % tm.tm_sec).str ();
}
/** JSON sink. The resulting JSON object is provided as both a property_tree::ptree (to_object) and a string (to_string) */
class json_writer : public nano::stat_log_sink
{
boost::property_tree::ptree tree;
boost::property_tree::ptree entries;
public:
std::ostream & out () override
{
return sstr;
}
void begin () override
{
tree.clear ();
}
void write_header (std::string const & header, std::chrono::system_clock::time_point & walltime) override
{
std::time_t now = std::chrono::system_clock::to_time_t (walltime);
tm tm = *localtime (&now);
tree.put ("type", header);
tree.put ("created", tm_to_string (tm));
}
void write_entry (tm & tm, std::string const & type, std::string const & detail, std::string const & dir, uint64_t value, nano::stat_histogram * histogram) override
{
boost::property_tree::ptree entry;
entry.put ("time", boost::format ("%02d:%02d:%02d") % tm.tm_hour % tm.tm_min % tm.tm_sec);
entry.put ("type", type);
entry.put ("detail", detail);
entry.put ("dir", dir);
entry.put ("value", value);
if (histogram != nullptr)
{
boost::property_tree::ptree histogram_node;
for (auto const & bin : histogram->get_bins ())
{
boost::property_tree::ptree bin_node;
bin_node.put ("start_inclusive", bin.start_inclusive);
bin_node.put ("end_exclusive", bin.end_exclusive);
bin_node.put ("value", bin.value);
std::time_t time = std::chrono::system_clock::to_time_t (bin.timestamp);
struct tm local_tm = *localtime (&time);
bin_node.put ("time", boost::format ("%02d:%02d:%02d") % local_tm.tm_hour % local_tm.tm_min % local_tm.tm_sec);
histogram_node.push_back (std::make_pair ("", bin_node));
}
entry.put_child ("histogram", histogram_node);
}
entries.push_back (std::make_pair ("", entry));
}
void finalize () override
{
tree.add_child ("entries", entries);
}
void * to_object () override
{
return &tree;
}
std::string to_string () override
{
boost::property_tree::write_json (sstr, tree);
return sstr.str ();
}
private:
std::ostringstream sstr;
};
/** File sink with rotation support. This writes one counter per line and does not include histogram values. */
class file_writer : public nano::stat_log_sink
{
public:
std::ofstream log;
std::string filename;
explicit file_writer (std::string const & filename) :
filename (filename)
{
log.open (filename.c_str (), std::ofstream::out);
}
virtual ~file_writer ()
{
log.close ();
}
std::ostream & out () override
{
return log;
}
void write_header (std::string const & header, std::chrono::system_clock::time_point & walltime) override
{
std::time_t now = std::chrono::system_clock::to_time_t (walltime);
tm tm = *localtime (&now);
log << header << "," << boost::format ("%04d.%02d.%02d %02d:%02d:%02d") % (1900 + tm.tm_year) % (tm.tm_mon + 1) % tm.tm_mday % tm.tm_hour % tm.tm_min % tm.tm_sec << std::endl;
}
void write_entry (tm & tm, std::string const & type, std::string const & detail, std::string const & dir, uint64_t value, nano::stat_histogram *) override
{
log << boost::format ("%02d:%02d:%02d") % tm.tm_hour % tm.tm_min % tm.tm_sec << "," << type << "," << detail << "," << dir << "," << value << std::endl;
}
void rotate () override
{
log.close ();
log.open (filename.c_str (), std::ofstream::out);
log_entries = 0;
}
};
nano::stat_histogram::stat_histogram (std::initializer_list<uint64_t> intervals_a, size_t bin_count_a)
{
if (bin_count_a == 0)
{
debug_assert (intervals_a.size () > 1);
uint64_t start_inclusive_l = *intervals_a.begin ();
for (auto it = std::next (intervals_a.begin ()); it != intervals_a.end (); ++it)
{
uint64_t end_exclusive_l = *it;
bins.emplace_back (start_inclusive_l, end_exclusive_l);
start_inclusive_l = end_exclusive_l;
}
}
else
{
debug_assert (intervals_a.size () == 2);
uint64_t min_inclusive_l = *intervals_a.begin ();
uint64_t max_exclusive_l = *std::next (intervals_a.begin ());
auto domain_l = (max_exclusive_l - min_inclusive_l);
auto bin_size_l = (domain_l + bin_count_a - 1) / bin_count_a;
auto last_bin_size_l = (domain_l % bin_size_l);
auto next_start_l = min_inclusive_l;
for (size_t i = 0; i < bin_count_a; i++, next_start_l += bin_size_l)
{
bins.emplace_back (next_start_l, next_start_l + bin_size_l);
}
if (last_bin_size_l > 0)
{
bins.emplace_back (next_start_l, next_start_l + last_bin_size_l);
}
}
}
void nano::stat_histogram::add (uint64_t index_a, uint64_t addend_a)
{
nano::lock_guard<nano::mutex> lk{ histogram_mutex };
debug_assert (!bins.empty ());
// The search for a bin is linear, but we're searching just a few
// contiguous items which are likely to be in cache.
bool found_l = false;
for (auto & bin : bins)
{
if (index_a >= bin.start_inclusive && index_a < bin.end_exclusive)
{
bin.value += addend_a;
bin.timestamp = std::chrono::system_clock::now ();
found_l = true;
break;
}
}
// Clamp into first or last bin if no suitable bin was found
if (!found_l)
{
if (index_a < bins.front ().start_inclusive)
{
bins.front ().value += addend_a;
}
else
{
bins.back ().value += addend_a;
}
}
}
std::vector<nano::stat_histogram::bin> nano::stat_histogram::get_bins () const
{
nano::lock_guard<nano::mutex> lk{ histogram_mutex };
return bins;
}
/*
* stats
*/
nano::stats::stats (nano::stats_config config) :
config (config)
config{ std::move (config) }
{
}
std::shared_ptr<nano::stat_entry> nano::stats::get_entry (uint32_t key)
nano::stats::~stats ()
{
return get_entry (key, config.interval, config.capacity);
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
}
std::shared_ptr<nano::stat_entry> nano::stats::get_entry (uint32_t key, size_t interval, size_t capacity)
void nano::stats::start ()
{
nano::unique_lock<nano::mutex> lock{ stat_mutex };
return get_entry_impl (key, interval, capacity);
}
std::shared_ptr<nano::stat_entry> nano::stats::get_entry_impl (uint32_t key, size_t interval, size_t capacity)
{
std::shared_ptr<nano::stat_entry> res;
auto entry = entries.find (key);
if (entry == entries.end ())
if (!should_run ())
{
res = entries.emplace (key, std::make_shared<nano::stat_entry> (capacity, interval)).first->second;
}
else
{
res = entry->second;
return;
}
return res;
thread = std::thread ([this] {
nano::thread_role::set (nano::thread_role::name::stats);
run ();
});
}
std::unique_ptr<nano::stat_log_sink> nano::stats::log_sink_json () const
void nano::stats::stop ()
{
return std::make_unique<json_writer> ();
{
std::lock_guard guard{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}
void nano::stats::clear ()
{
std::lock_guard guard{ mutex };
counters.clear ();
samplers.clear ();
timestamp = std::chrono::steady_clock::now ();
}
void nano::stats::add (stat::type type, stat::detail detail, stat::dir dir, counter_value_t value, bool aggregate_all)
{
debug_assert (type != stat::type::_invalid);
debug_assert (detail != stat::detail::_invalid);
if (value == 0)
{
return;
}
// Updates need to happen while holding the mutex
auto update_counter = [this, aggregate_all] (nano::stats::counter_key key, auto && updater) {
counter_key all_key{ key.type, stat::detail::all, key.dir };
// This is a two-step process to avoid exclusively locking the mutex in the common case
{
std::shared_lock lock{ mutex };
if (auto it = counters.find (key); it != counters.end ())
{
updater (*it->second);
if (aggregate_all && key != all_key)
{
auto it_all = counters.find (all_key);
release_assert (it_all != counters.end ()); // The `all` counter should always be created together
updater (*it_all->second); // Also update the `all` counter
}
return;
}
}
// Not found, create a new entry
{
std::unique_lock lock{ mutex };
// Insertions will be ignored if the key already exists
auto [it, inserted] = counters.emplace (key, std::make_unique<counter_entry> ());
updater (*it->second);
if (aggregate_all && key != all_key)
{
auto [it_all, inserted_all] = counters.emplace (all_key, std::make_unique<counter_entry> ());
updater (*it_all->second); // Also update the `all` counter
}
}
};
update_counter (counter_key{ type, detail, dir }, [value] (counter_entry & counter) {
counter.value += value;
});
}
nano::stats::counter_value_t nano::stats::count (stat::type type, stat::detail detail, stat::dir dir) const
{
std::shared_lock lock{ mutex };
if (auto it = counters.find (counter_key{ type, detail, dir }); it != counters.end ())
{
return it->second->value;
}
return 0;
}
nano::stats::counter_value_t nano::stats::count (stat::type type, stat::dir dir) const
{
std::shared_lock lock{ mutex };
counter_value_t result = 0;
auto it = counters.lower_bound (counter_key{ type, stat::detail::all, dir });
while (it != counters.end () && it->first.type == type)
{
if (it->first.dir == dir && it->first.detail != stat::detail::all)
{
result += it->second->value;
}
++it;
}
return result;
}
void nano::stats::sample (stat::sample sample, std::pair<sampler_value_t, sampler_value_t> expected_min_max, nano::stats::sampler_value_t value)
{
debug_assert (sample != stat::sample::_invalid);
// Updates need to happen while holding the mutex
auto update_sampler = [this, expected_min_max] (nano::stats::sampler_key key, auto && updater) {
// This is a two-step process to avoid exclusively locking the mutex in the common case
{
std::shared_lock lock{ mutex };
if (auto it = samplers.find (key); it != samplers.end ())
{
updater (*it->second);
return;
}
}
// Not found, create a new entry
{
std::unique_lock lock{ mutex };
// Insertions will be ignored if the key already exists
auto [it, inserted] = samplers.emplace (key, std::make_unique<sampler_entry> (config.max_samples, expected_min_max));
updater (*it->second);
}
};
update_sampler (sampler_key{ sample }, [value] (sampler_entry & sampler) {
sampler.add (value);
});
}
auto nano::stats::samples (stat::sample sample) -> std::vector<sampler_value_t>
{
std::shared_lock lock{ mutex };
if (auto it = samplers.find (sampler_key{ sample }); it != samplers.end ())
{
return it->second->collect ();
}
return {};
}
void nano::stats::log_counters (stat_log_sink & sink)
{
nano::unique_lock<nano::mutex> lock{ stat_mutex };
log_counters_impl (sink);
// TODO: Replace with a proper std::chrono time
std::time_t time = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now ());
tm local_tm = *localtime (&time);
std::lock_guard guard{ mutex };
log_counters_impl (sink, local_tm);
}
void nano::stats::log_counters_impl (stat_log_sink & sink)
void nano::stats::log_counters_impl (stat_log_sink & sink, tm & tm)
{
sink.begin ();
if (sink.entries () >= config.log_rotation_count)
@ -313,16 +217,13 @@ void nano::stats::log_counters_impl (stat_log_sink & sink)
sink.write_header ("counters", walltime);
}
for (auto & it : entries)
for (auto const & [key, entry] : counters)
{
std::time_t time = std::chrono::system_clock::to_time_t (it.second->counter.get_timestamp ());
tm local_tm = *localtime (&time);
std::string type{ to_string (key.type) };
std::string detail{ to_string (key.detail) };
std::string dir{ to_string (key.dir) };
auto key = it.first;
std::string type = type_to_string (key);
std::string detail = detail_to_string (key);
std::string dir = dir_to_string (key);
sink.write_entry (local_tm, type, detail, dir, it.second->counter.get_value (), it.second->histogram.get ());
sink.write_counter_entry (tm, type, detail, dir, entry->value);
}
sink.entries ()++;
sink.finalize ();
@ -330,11 +231,15 @@ void nano::stats::log_counters_impl (stat_log_sink & sink)
void nano::stats::log_samples (stat_log_sink & sink)
{
nano::unique_lock<nano::mutex> lock{ stat_mutex };
log_samples_impl (sink);
// TODO: Replace with a proper std::chrono time
std::time_t time = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now ());
tm local_tm = *localtime (&time);
std::lock_guard guard{ mutex };
log_samples_impl (sink, local_tm);
}
void nano::stats::log_samples_impl (stat_log_sink & sink)
void nano::stats::log_samples_impl (stat_log_sink & sink, tm & tm)
{
sink.begin ();
if (sink.entries () >= config.log_rotation_count)
@ -348,214 +253,165 @@ void nano::stats::log_samples_impl (stat_log_sink & sink)
sink.write_header ("samples", walltime);
}
for (auto & it : entries)
for (auto const & [key, entry] : samplers)
{
auto key = it.first;
std::string type = type_to_string (key);
std::string detail = detail_to_string (key);
std::string dir = dir_to_string (key);
std::string sample{ to_string (key.sample) };
for (auto & datapoint : it.second->samples)
{
std::time_t time = std::chrono::system_clock::to_time_t (datapoint.get_timestamp ());
tm local_tm = *localtime (&time);
sink.write_entry (local_tm, type, detail, dir, datapoint.get_value (), nullptr);
}
sink.write_sampler_entry (tm, sample, entry->collect (), entry->expected_min_max);
}
sink.entries ()++;
sink.entries ()++; // TODO: This `++` looks like a hack, needs a redesign
sink.finalize ();
}
void nano::stats::define_histogram (stat::type type, stat::detail detail, stat::dir dir, std::initializer_list<uint64_t> intervals_a, size_t bin_count_a /*=0*/)
bool nano::stats::should_run () const
{
auto entry (get_entry (key_of (type, detail, dir)));
entry->histogram = std::make_unique<nano::stat_histogram> (intervals_a, bin_count_a);
}
void nano::stats::update_histogram (stat::type type, stat::detail detail, stat::dir dir, uint64_t index_a, uint64_t addend_a)
{
auto entry (get_entry (key_of (type, detail, dir)));
debug_assert (entry->histogram != nullptr);
entry->histogram->add (index_a, addend_a);
}
nano::stat_histogram * nano::stats::get_histogram (stat::type type, stat::detail detail, stat::dir dir)
{
auto entry (get_entry (key_of (type, detail, dir)));
debug_assert (entry->histogram != nullptr);
return entry->histogram.get ();
}
void nano::stats::update (uint32_t key_a, uint64_t value)
{
static file_writer log_count (config.log_counters_filename);
static file_writer log_sample (config.log_samples_filename);
nano::unique_lock<nano::mutex> lock{ stat_mutex };
if (!stopped)
if (config.log_counters_interval.count () > 0)
{
auto entry (get_entry_impl (key_a, config.interval, config.capacity));
auto has_interval_counter = [&] () {
return config.log_interval_counters > 0;
};
auto has_sampling = [&] () {
return config.sampling_enabled && entry->sample_interval > 0;
};
return true;
}
if (config.log_samples_interval.count () > 0)
{
return true;
}
return false;
}
// Counters
auto old (entry->counter.get_value ());
entry->counter.add (value, has_sampling ()); // Only update timestamp when sampling is enabled as this has a performance impact
entry->count_observers.notify (old, entry->counter.get_value ());
if (has_interval_counter () || has_sampling ())
void nano::stats::run ()
{
std::unique_lock lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, 1s);
if (!stopped)
{
auto now = std::chrono::steady_clock::now (); // Only sample clock if necessary as this impacts node performance due to frequent usage
if (has_interval_counter ())
{
std::chrono::duration<double, std::milli> duration = now - log_last_count_writeout;
if (duration.count () > config.log_interval_counters)
{
log_counters_impl (log_count);
log_last_count_writeout = now;
}
}
run_one (lock);
debug_assert (lock.owns_lock ());
}
}
}
// Samples
if (has_sampling ())
{
entry->sample_current.add (value, false);
void nano::stats::run_one (std::unique_lock<std::shared_mutex> & lock)
{
static stat_file_writer log_count{ config.log_counters_filename };
static stat_file_writer log_sample{ config.log_samples_filename };
std::chrono::duration<double, std::milli> duration = now - entry->sample_start_time;
if (duration.count () > entry->sample_interval)
{
entry->sample_start_time = now;
debug_assert (!mutex.try_lock ());
debug_assert (lock.owns_lock ());
// Make a snapshot of samples for thread safety and to get a stable container
entry->sample_current.set_timestamp (std::chrono::system_clock::now ());
entry->samples.push_back (entry->sample_current);
entry->sample_current.set_value (0);
// TODO: Replace with a proper std::chrono time
std::time_t time = std::chrono::system_clock::to_time_t (std::chrono::system_clock::now ());
tm local_tm = *localtime (&time);
if (!entry->sample_observers.empty ())
{
auto snapshot (entry->samples);
entry->sample_observers.notify (snapshot);
}
// Counters
if (config.log_counters_interval.count () > 0)
{
if (nano::elapse (log_last_count_writeout, config.log_counters_interval))
{
log_counters_impl (log_count, local_tm);
}
}
// Log sink
duration = now - log_last_sample_writeout;
if (config.log_interval_samples > 0 && duration.count () > config.log_interval_samples)
{
log_samples_impl (log_sample);
log_last_sample_writeout = now;
}
}
}
// Samples
if (config.log_samples_interval.count () > 0)
{
if (nano::elapse (log_last_sample_writeout, config.log_samples_interval))
{
log_samples_impl (log_sample, local_tm);
}
}
}
std::chrono::seconds nano::stats::last_reset ()
{
nano::unique_lock<nano::mutex> lock{ stat_mutex };
std::lock_guard guard{ mutex };
auto now (std::chrono::steady_clock::now ());
return std::chrono::duration_cast<std::chrono::seconds> (now - timestamp);
}
void nano::stats::stop ()
std::string nano::stats::dump (category category)
{
nano::lock_guard<nano::mutex> guard{ stat_mutex };
stopped = true;
}
std::string nano::stats::to_string (std::string type)
{
auto sink = log_sink_json ();
if (type == "counters")
stat_json_writer sink;
switch (category)
{
log_counters (*sink);
return sink->to_string ();
case category::counters:
log_counters (sink);
break;
case category::samples:
log_samples (sink);
break;
default:
debug_assert (false, "missing stat_category case");
}
else if (type == "samples")
return sink.to_string ();
}
/*
* stats::sampler_entry
*/
void nano::stats::sampler_entry::add (nano::stats::sampler_value_t value)
{
nano::lock_guard<nano::mutex> guard{ mutex };
samples.push_back (value);
}
auto nano::stats::sampler_entry::collect () -> std::vector<sampler_value_t>
{
nano::lock_guard<nano::mutex> guard{ mutex };
std::vector<sampler_value_t> result{ samples.begin (), samples.end () };
samples.clear ();
return result;
}
/*
* stats_config
*/
nano::error nano::stats_config::serialize_toml (nano::tomlconfig & toml) const
{
toml.put ("max_samples", max_samples, "Maximum number of samples to keep in the ring buffer.\ntype:uint64");
nano::tomlconfig log_l;
log_l.put ("headers", log_headers, "If true, write headers on each counter or samples writeout.\nThe header contains log type and the current wall time.\ntype:bool");
log_l.put ("interval_counters", log_counters_interval.count (), "How often to log counters. 0 disables logging.\ntype:milliseconds");
log_l.put ("interval_samples", log_samples_interval.count (), "How often to log samples. 0 disables logging.\ntype:milliseconds");
log_l.put ("rotation_count", log_rotation_count, "Maximum number of log outputs before rotating the file.\ntype:uint64");
log_l.put ("filename_counters", log_counters_filename, "Log file name for counters.\ntype:string");
log_l.put ("filename_samples", log_samples_filename, "Log file name for samples.\ntype:string");
toml.put_child ("log", log_l);
return toml.get_error ();
}
nano::error nano::stats_config::deserialize_toml (nano::tomlconfig & toml)
{
toml.get ("max_samples", max_samples);
if (auto maybe_log_l = toml.get_optional_child ("log"))
{
log_samples (*sink);
return sink->to_string ();
auto log_l = *maybe_log_l;
log_l.get ("headers", log_headers);
auto counters_interval_l = log_counters_interval.count ();
log_l.get ("interval_counters", counters_interval_l);
log_counters_interval = std::chrono::milliseconds{ counters_interval_l };
auto samples_interval_l = log_samples_interval.count ();
log_l.get ("interval_samples", samples_interval_l);
log_samples_interval = std::chrono::milliseconds{ samples_interval_l };
log_l.get ("rotation_count", log_rotation_count);
log_l.get ("filename_counters", log_counters_filename);
log_l.get ("filename_samples", log_samples_filename);
// Don't allow specifying the same file name for counter and samples logs
if (log_counters_filename == log_samples_filename)
{
toml.get_error ().set ("The statistics counter and samples config values must be different");
}
}
else
{
return "type not supported: " + type;
}
}
void nano::stats::clear ()
{
nano::unique_lock<nano::mutex> lock{ stat_mutex };
entries.clear ();
timestamp = std::chrono::steady_clock::now ();
}
std::string nano::stats::type_to_string (uint32_t key)
{
auto type = static_cast<stat::type> (key >> 16 & 0x000000ff);
return std::string{ nano::to_string (type) };
}
std::string nano::stats::detail_to_string (uint32_t key)
{
auto detail = static_cast<stat::detail> (key >> 8 & 0x000000ff);
return std::string{ nano::to_string (detail) };
}
std::string nano::stats::dir_to_string (uint32_t key)
{
auto dir = static_cast<stat::dir> (key & 0x000000ff);
return std::string{ nano::to_string (dir) };
}
nano::stat_datapoint::stat_datapoint (stat_datapoint const & other_a)
{
nano::lock_guard<nano::mutex> lock{ other_a.datapoint_mutex };
value = other_a.value;
timestamp = other_a.timestamp;
}
nano::stat_datapoint & nano::stat_datapoint::operator= (stat_datapoint const & other_a)
{
nano::lock_guard<nano::mutex> lock{ other_a.datapoint_mutex };
value = other_a.value;
timestamp = other_a.timestamp;
return *this;
}
uint64_t nano::stat_datapoint::get_value () const
{
nano::lock_guard<nano::mutex> lock{ datapoint_mutex };
return value;
}
void nano::stat_datapoint::set_value (uint64_t value_a)
{
nano::lock_guard<nano::mutex> lock{ datapoint_mutex };
value = value_a;
}
std::chrono::system_clock::time_point nano::stat_datapoint::get_timestamp () const
{
nano::lock_guard<nano::mutex> lock{ datapoint_mutex };
return timestamp;
}
void nano::stat_datapoint::set_timestamp (std::chrono::system_clock::time_point timestamp_a)
{
nano::lock_guard<nano::mutex> lock{ datapoint_mutex };
timestamp = timestamp_a;
}
/** Add \addend to the current value and optionally update the timestamp */
void nano::stat_datapoint::add (uint64_t addend, bool update_timestamp)
{
nano::lock_guard<nano::mutex> lock{ datapoint_mutex };
value += addend;
if (update_timestamp)
{
timestamp = std::chrono::system_clock::now ();
}
}
return toml.get_error ();
}

View file

@ -9,10 +9,12 @@
#include <chrono>
#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <thread>
namespace nano
{
@ -32,20 +34,15 @@ public:
nano::error deserialize_toml (nano::tomlconfig & toml);
nano::error serialize_toml (nano::tomlconfig & toml) const;
/** If true, sampling of counters is enabled */
bool sampling_enabled{ false };
/** How many sample intervals to keep in the ring buffer */
size_t capacity{ 0 };
/** Sample interval in milliseconds */
size_t interval{ 0 };
public:
/** Maximum number samples to keep in the ring buffer */
size_t max_samples{ 1024 * 16 };
/** How often to log sample array, in milliseconds. Default is 0 (no logging) */
size_t log_interval_samples{ 0 };
std::chrono::milliseconds log_samples_interval{ 0 };
/** How often to log counters, in milliseconds. Default is 0 (no logging) */
size_t log_interval_counters{ 0 };
std::chrono::milliseconds log_counters_interval{ 0 };
/** Maximum number of log outputs before rotating the file */
size_t log_rotation_count{ 100 };
@ -60,95 +57,160 @@ public:
std::string log_samples_filename{ "samples.stat" };
};
/** Value and wall time of measurement */
class stat_datapoint final
{
public:
stat_datapoint () = default;
stat_datapoint (stat_datapoint const & other_a);
stat_datapoint & operator= (stat_datapoint const & other_a);
uint64_t get_value () const;
void set_value (uint64_t value_a);
std::chrono::system_clock::time_point get_timestamp () const;
void set_timestamp (std::chrono::system_clock::time_point timestamp_a);
void add (uint64_t addend, bool update_timestamp = true);
private:
mutable nano::mutex datapoint_mutex;
/** Value of the sample interval */
uint64_t value{ 0 };
/** When the sample was added. This is wall time (system_clock), suitable for display purposes. */
std::chrono::system_clock::time_point timestamp{ std::chrono::system_clock::now () };
};
/** Histogram values */
class stat_histogram final
{
public:
/**
* Create histogram given a set of intervals and an optional bin count
* @param intervals_a Inclusive-exclusive intervals, e.g. {1,5,8,15} produces bins [1,4] [5,7] [8, 14]
* @param bin_count_a If zero (default), \p intervals_a defines all the bins. If non-zero, \p intervals_a contains the total range, which is uniformly distributed into \p bin_count_a bins.
*/
stat_histogram (std::initializer_list<uint64_t> intervals_a, size_t bin_count_a = 0);
/** Add \p addend_a to the histogram bin into which \p index_a falls */
void add (uint64_t index_a, uint64_t addend_a);
/** Histogram bin with interval, current value and timestamp of last update */
class bin final
{
public:
bin (uint64_t start_inclusive_a, uint64_t end_exclusive_a) :
start_inclusive (start_inclusive_a), end_exclusive (end_exclusive_a)
{
}
uint64_t start_inclusive;
uint64_t end_exclusive;
uint64_t value{ 0 };
std::chrono::system_clock::time_point timestamp{ std::chrono::system_clock::now () };
};
std::vector<bin> get_bins () const;
private:
mutable nano::mutex histogram_mutex;
std::vector<bin> bins;
};
class stat_log_sink;
/**
* Bookkeeping of statistics for a specific type/detail/direction combination
* Collects counts and samples for inbound and outbound traffic, blocks, errors, and so on.
* Stats can be queried and observed on a type level (such as message and ledger) as well as a more
* specific detail level (such as send blocks)
*/
class stat_entry final
class stats final
{
public:
stat_entry (size_t capacity, size_t interval) :
samples (capacity), sample_interval (interval)
using counter_value_t = uint64_t;
using sampler_value_t = int64_t;
public:
explicit stats (nano::stats_config = {});
~stats ();
void start ();
void stop ();
/** Clear all stats */
void clear ();
/** Increments the given counter */
void inc (stat::type type, stat::detail detail, bool aggregate_all = false)
{
inc (type, detail, stat::dir::in, aggregate_all);
}
/** Optional samples. Note that this doesn't allocate any memory unless sampling is configured, which sets the capacity. */
boost::circular_buffer<stat_datapoint> samples;
void inc (stat::type type, stat::detail detail, stat::dir dir, bool aggregate_all = false)
{
add (type, detail, dir, 1, aggregate_all);
}
/** Start time of current sample interval. This is a steady clock for measuring interval; the datapoint contains the wall time. */
std::chrono::steady_clock::time_point sample_start_time{ std::chrono::steady_clock::now () };
/** Adds \p value to the given counter */
void add (stat::type type, stat::detail detail, counter_value_t value, bool aggregate_all = false)
{
add (type, detail, stat::dir::in, value, aggregate_all);
}
/** Sample interval in milliseconds. If 0, sampling is disabled. */
size_t sample_interval;
void add (stat::type type, stat::detail detail, stat::dir dir, counter_value_t value, bool aggregate_all = false);
/** Value within the current sample interval */
stat_datapoint sample_current;
/** Returns current value for the given counter at the detail level */
counter_value_t count (stat::type type, stat::detail detail, stat::dir dir = stat::dir::in) const;
/** Counting value for this entry, including the time of last update. This is never reset and only increases. */
stat_datapoint counter;
/** Returns current value for the given counter at the type level (sum of all details) */
counter_value_t count (stat::type type, stat::dir dir = stat::dir::in) const;
/** Optional histogram for this entry */
std::unique_ptr<stat_histogram> histogram;
/** Adds a sample to the given sampler */
void sample (stat::sample sample, std::pair<sampler_value_t, sampler_value_t> expected_min_max, sampler_value_t value);
/** Zero or more observers for samples. Called at the end of the sample interval. */
nano::observer_set<boost::circular_buffer<stat_datapoint> &> sample_observers;
/** Returns a potentially empty list of the last N samples, where N is determined by the 'max_samples' configuration. Samples are reset after each lookup. */
std::vector<sampler_value_t> samples (stat::sample sample);
/** Observers for count. Called on each update. */
nano::observer_set<uint64_t, uint64_t> count_observers;
/** Returns the number of seconds since clear() was last called, or node startup if it's never called. */
std::chrono::seconds last_reset ();
/** Log counters to the given log link */
void log_counters (stat_log_sink & sink);
/** Log samples to the given log sink */
void log_samples (stat_log_sink & sink);
public:
enum class category
{
counters,
samples
};
/** Return string showing stats counters (convenience function for debugging) */
std::string dump (category category = category::counters);
private:
struct counter_key
{
stat::type type;
stat::detail detail;
stat::dir dir;
auto operator<=> (const counter_key &) const = default;
};
struct sampler_key
{
stat::sample sample;
auto operator<=> (const sampler_key &) const = default;
};
private:
class counter_entry
{
public:
// Prevent copying
counter_entry () = default;
counter_entry (counter_entry const &) = delete;
counter_entry & operator= (counter_entry const &) = delete;
public:
std::atomic<counter_value_t> value{ 0 };
};
class sampler_entry
{
public:
std::pair<sampler_value_t, sampler_value_t> const expected_min_max;
sampler_entry (size_t max_samples, std::pair<sampler_value_t, sampler_value_t> expected_min_max) :
samples{ max_samples },
expected_min_max{ expected_min_max } {};
// Prevent copying
sampler_entry (sampler_entry const &) = delete;
sampler_entry & operator= (sampler_entry const &) = delete;
public:
void add (sampler_value_t value);
std::vector<sampler_value_t> collect ();
private:
boost::circular_buffer<sampler_value_t> samples;
mutable nano::mutex mutex;
};
// Wrap in unique_ptrs because mutex/atomic members are not movable
// TODO: Compare performance of map vs unordered_map
std::map<counter_key, std::unique_ptr<counter_entry>> counters;
std::map<sampler_key, std::unique_ptr<sampler_entry>> samplers;
private:
void run ();
void run_one (std::unique_lock<std::shared_mutex> & lock);
bool should_run () const;
/** Unlocked implementation of log_counters() to avoid using recursive locking */
void log_counters_impl (stat_log_sink & sink, tm & tm);
/** Unlocked implementation of log_samples() to avoid using recursive locking */
void log_samples_impl (stat_log_sink & sink, tm & tm);
private:
nano::stats_config const config;
/** Time of last clear() call */
std::chrono::steady_clock::time_point timestamp{ std::chrono::steady_clock::now () };
std::chrono::steady_clock::time_point log_last_count_writeout{ std::chrono::steady_clock::now () };
std::chrono::steady_clock::time_point log_last_sample_writeout{ std::chrono::steady_clock::now () };
bool stopped{ false };
mutable std::shared_mutex mutex;
nano::condition_variable condition;
std::thread thread;
};
/** Log sink interface */
@ -175,10 +237,9 @@ public:
{
}
/** Write a counter or sampling entry to the log. Some log sinks may support writing histograms as well. */
virtual void write_entry (tm & tm, std::string const & type, std::string const & detail, std::string const & dir, uint64_t value, nano::stat_histogram * histogram)
{
}
/** Write a counter or sampling entry to the log. */
virtual void write_counter_entry (tm & tm, std::string const & type, std::string const & detail, std::string const & dir, stats::counter_value_t value) = 0;
virtual void write_sampler_entry (tm & tm, std::string const & sample, std::vector<stats::sampler_value_t> const & values, std::pair<stats::sampler_value_t, stats::sampler_value_t> expected_min_max) = 0;
/** Rotates the log (e.g. empty file). This is a no-op for sinks where rotation is not supported. */
virtual void rotate ()
@ -186,7 +247,7 @@ public:
}
/** Returns a reference to the log entry counter */
size_t & entries ()
std::size_t & entries ()
{
return log_entries;
}
@ -197,257 +258,8 @@ public:
return "";
}
/**
* Returns the object representation of the log result. The type depends on the sink used.
* @returns Object, or nullptr if no object result is available.
*/
virtual void * to_object ()
{
return nullptr;
}
protected:
std::string tm_to_string (tm & tm);
size_t log_entries{ 0 };
};
/**
* Collects counts and samples for inbound and outbound traffic, blocks, errors, and so on.
* Stats can be queried and observed on a type level (such as message and ledger) as well as a more
* specific detail level (such as send blocks)
*/
class stats final
{
public:
/** Constructor using the default config values */
stats () = default;
/**
* Initialize stats with a config.
* @param config Configuration object; deserialized from config.json
*/
stats (nano::stats_config config);
/**
* Call this to override the default sample interval and capacity, for a specific stat entry.
* This must be called before any stat entries are added, as part of the node initialiation.
*/
void configure (stat::type type, stat::detail detail, stat::dir dir, size_t interval, size_t capacity)
{
get_entry (key_of (type, detail, dir), interval, capacity);
}
/**
* Disables sampling for a given type/detail/dir combination
*/
void disable_sampling (stat::type type, stat::detail detail, stat::dir dir)
{
auto entry = get_entry (key_of (type, detail, dir));
entry->sample_interval = 0;
}
/** Increments the given counter */
void inc (stat::type type, stat::dir dir = stat::dir::in)
{
add (type, dir, 1);
}
/** Increments the counter for \detail, but doesn't update at the type level */
void inc_detail_only (stat::type type, stat::detail detail, stat::dir dir = stat::dir::in)
{
add (type, detail, dir, 1, true);
}
/** Increments the given counter */
void inc (stat::type type, stat::detail detail, stat::dir dir = stat::dir::in)
{
add (type, detail, dir, 1);
}
/** Adds \p value to the given counter */
void add (stat::type type, stat::dir dir, uint64_t value)
{
add (type, stat::detail::all, dir, value);
}
/**
* Define histogram bins. Values are clamped into the first and last bins, but a catch-all bin on one or both
* ends can be defined.
*
* Examples:
*
* // Uniform histogram, total range 12, and 12 bins (each bin has width 1)
* define_histogram (type::vote, detail::confirm_ack, dir::in, {1,13}, 12);
*
* // Specific bins matching closed intervals [1,4] [5,19] [20,99]
* define_histogram (type::vote, detail::something, dir::out, {1,5,20,100});
*
* // Logarithmic bins matching half-open intervals [1..10) [10..100) [100 1000)
* define_histogram(type::vote, detail::log, dir::out, {1,10,100,1000});
*/
void define_histogram (stat::type type, stat::detail detail, stat::dir dir, std::initializer_list<uint64_t> intervals_a, size_t bin_count_a = 0);
/**
* Update histogram
*
* Examples:
*
* // Add 1 to the bin representing a 4-item vbh
* stats.update_histogram(type::vote, detail::confirm_ack, dir::in, 4, 1)
*
* // Add 5 to the second bin where 17 falls
* stats.update_histogram(type::vote, detail::something, dir::in, 17, 5)
*
* // Add 3 to the last bin as the histogram clamps. You can also add a final bin with maximum end value to effectively prevent this.
* stats.update_histogram(type::vote, detail::log, dir::out, 1001, 3)
*/
void update_histogram (stat::type type, stat::detail detail, stat::dir dir, uint64_t index, uint64_t addend = 1);
/** Returns a non-owning histogram pointer, or nullptr if a histogram is not defined */
nano::stat_histogram * get_histogram (stat::type type, stat::detail detail, stat::dir dir);
/**
* Add \p value to stat. If sampling is configured, this will update the current sample and
* call any sample observers if the interval is over.
*
* @param type Main statistics type
* @param detail Detail type, or detail::none to register on type-level only
* @param dir Direction
* @param value The amount to add
* @param detail_only If true, only update the detail-level counter
*/
void add (stat::type type, stat::detail detail, stat::dir dir, uint64_t value, bool detail_only = false)
{
if (value == 0)
{
return;
}
constexpr uint32_t no_detail_mask = 0xffff00ff;
uint32_t key = key_of (type, detail, dir);
update (key, value);
// Optionally update at type-level as well
if (!detail_only && (key & no_detail_mask) != key)
{
update (key & no_detail_mask, value);
}
}
/**
* Add a sampling observer for a given counter.
* The observer receives a snapshot of the current sampling. Accessing the sample buffer is thus thread safe.
* To avoid recursion, the observer callback must only use the received data point snapshop, not query the stat object.
* @param observer The observer receives a snapshot of the current samples.
*/
void observe_sample (stat::type type, stat::detail detail, stat::dir dir, std::function<void (boost::circular_buffer<stat_datapoint> &)> observer)
{
get_entry (key_of (type, detail, dir))->sample_observers.add (observer);
}
void observe_sample (stat::type type, stat::dir dir, std::function<void (boost::circular_buffer<stat_datapoint> &)> observer)
{
observe_sample (type, stat::detail::all, dir, observer);
}
/**
* Add count observer for a given type, detail and direction combination. The observer receives old and new value.
* To avoid recursion, the observer callback must only use the received counts, not query the stat object.
* @param observer The observer receives the old and the new count.
*/
void observe_count (stat::type type, stat::detail detail, stat::dir dir, std::function<void (uint64_t, uint64_t)> observer)
{
get_entry (key_of (type, detail, dir))->count_observers.add (observer);
}
/** Returns a potentially empty list of the last N samples, where N is determined by the 'capacity' configuration */
boost::circular_buffer<stat_datapoint> * samples (stat::type type, stat::detail detail, stat::dir dir)
{
return &get_entry (key_of (type, detail, dir))->samples;
}
/** Returns current value for the given counter at the type level */
uint64_t count (stat::type type, stat::dir dir = stat::dir::in)
{
return count (type, stat::detail::all, dir);
}
/** Returns current value for the given counter at the detail level */
uint64_t count (stat::type type, stat::detail detail, stat::dir dir = stat::dir::in)
{
return get_entry (key_of (type, detail, dir))->counter.get_value ();
}
/** Returns the number of seconds since clear() was last called, or node startup if it's never called. */
std::chrono::seconds last_reset ();
/** Clear all stats */
void clear ();
/** Log counters to the given log link */
void log_counters (stat_log_sink & sink);
/** Log samples to the given log sink */
void log_samples (stat_log_sink & sink);
/** Returns a new JSON log sink */
std::unique_ptr<stat_log_sink> log_sink_json () const;
/** Stop stats being output */
void stop ();
/** Return string showing stats counters (convenience function for debugging) */
std::string to_string (std::string type = "counters");
private:
static std::string type_to_string (uint32_t key);
static std::string dir_to_string (uint32_t key);
static std::string detail_to_string (uint32_t key);
/** Constructs a key given type, detail and direction. This is used as input to update(...) and get_entry(...) */
uint32_t key_of (stat::type type, stat::detail detail, stat::dir dir) const
{
return static_cast<uint8_t> (type) << 16 | static_cast<uint8_t> (detail) << 8 | static_cast<uint8_t> (dir);
}
/** Get entry for key, creating a new entry if necessary, using interval and sample count from config */
std::shared_ptr<nano::stat_entry> get_entry (uint32_t key);
/** Get entry for key, creating a new entry if necessary */
std::shared_ptr<nano::stat_entry> get_entry (uint32_t key, size_t sample_interval, size_t max_samples);
/** Unlocked implementation of get_entry() */
std::shared_ptr<nano::stat_entry> get_entry_impl (uint32_t key, size_t sample_interval, size_t max_samples);
/**
* Update count and sample and call any observers on the key
* @param key a key constructor from stat::type, stat::detail and stat::direction
* @value Amount to add to the counter
*/
void update (uint32_t key, uint64_t value);
/** Unlocked implementation of log_counters() to avoid using recursive locking */
void log_counters_impl (stat_log_sink & sink);
/** Unlocked implementation of log_samples() to avoid using recursive locking */
void log_samples_impl (stat_log_sink & sink);
/** Time of last clear() call */
std::chrono::steady_clock::time_point timestamp{ std::chrono::steady_clock::now () };
/** Configuration deserialized from config.json */
nano::stats_config config;
/** Stat entries are sorted by key to simplify processing of log output */
std::unordered_map<uint32_t, std::shared_ptr<nano::stat_entry>> entries;
std::chrono::steady_clock::time_point log_last_count_writeout{ std::chrono::steady_clock::now () };
std::chrono::steady_clock::time_point log_last_sample_writeout{ std::chrono::steady_clock::now () };
/** Whether stats should be output */
bool stopped{ false };
/** All access to stat is thread safe, including calls from observers on the same thread */
nano::mutex stat_mutex;
};
}

View file

@ -15,4 +15,9 @@ std::string_view nano::to_string (nano::stat::detail detail)
std::string_view nano::to_string (nano::stat::dir dir)
{
return magic_enum::enum_name (dir);
}
std::string_view nano::to_string (nano::stat::sample sample)
{
return magic_enum::enum_name (sample);
}

View file

@ -8,8 +8,11 @@
namespace nano::stat
{
/** Primary statistics type */
enum class type : uint8_t
enum class type
{
_invalid = 0, // Default value, should not be used
test,
traffic_tcp,
error,
message,
@ -50,6 +53,7 @@ enum class type : uint8_t
bootstrap_server_overfill,
bootstrap_server_response,
active,
active_transactions,
active_started,
active_confirmed,
active_dropped,
@ -73,15 +77,16 @@ enum class type : uint8_t
};
/** Optional detail type */
enum class detail : uint8_t
enum class detail
{
all = 0,
_invalid = 0, // Default value, should not be used
// common
all,
ok,
test,
total,
loop,
loop_cleanup,
total,
process,
processed,
ignored,
@ -404,13 +409,23 @@ enum class detail : uint8_t
};
/** Direction of the stat. If the direction is irrelevant, use in */
enum class dir : uint8_t
enum class dir
{
in,
out,
_last // Must be the last enum
};
enum class sample
{
_invalid = 0, // Default value, should not be used
active_election_duration,
bootstrap_tag_duration,
_last // Must be the last enum
};
}
namespace nano
@ -418,6 +433,7 @@ namespace nano
std::string_view to_string (stat::type);
std::string_view to_string (stat::detail);
std::string_view to_string (stat::dir);
std::string_view to_string (stat::sample);
}
// Ensure that the enum_range is large enough to hold all values (including future ones)

138
nano/lib/stats_sinks.hpp Normal file
View file

@ -0,0 +1,138 @@
#pragma once
#include <nano/lib/stats.hpp>
#include <boost/format.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
namespace nano
{
/** JSON sink. The resulting JSON object is provided as both a property_tree::ptree (to_object) and a string (to_string) */
class stat_json_writer : public nano::stat_log_sink
{
boost::property_tree::ptree tree;
boost::property_tree::ptree entries;
public:
std::ostream & out () override
{
return sstr;
}
void begin () override
{
tree.clear ();
}
void write_header (std::string const & header, std::chrono::system_clock::time_point & walltime) override
{
std::time_t now = std::chrono::system_clock::to_time_t (walltime);
tm tm = *localtime (&now);
tree.put ("type", header);
tree.put ("created", tm_to_string (tm));
}
void write_counter_entry (tm & tm, std::string const & type, std::string const & detail, std::string const & dir, uint64_t value) override
{
boost::property_tree::ptree entry;
entry.put ("time", boost::format ("%02d:%02d:%02d") % tm.tm_hour % tm.tm_min % tm.tm_sec);
entry.put ("type", type);
entry.put ("detail", detail);
entry.put ("dir", dir);
entry.put ("value", value);
entries.push_back (std::make_pair ("", entry));
}
void write_sampler_entry (tm & tm, const std::string & sample, const std::vector<stats::sampler_value_t> & values, std::pair<stats::sampler_value_t, stats::sampler_value_t> expected_min_max) override
{
boost::property_tree::ptree entry;
entry.put ("time", boost::format ("%02d:%02d:%02d") % tm.tm_hour % tm.tm_min % tm.tm_sec);
entry.put ("sample", sample);
entry.put ("min", expected_min_max.first);
entry.put ("max", expected_min_max.second);
boost::property_tree::ptree values_tree;
for (const auto & value : values)
{
boost::property_tree::ptree value_tree;
value_tree.put ("", value);
values_tree.push_back (std::make_pair ("", value_tree));
}
entry.add_child ("values", values_tree);
entries.push_back (std::make_pair ("", entry));
}
void finalize () override
{
tree.add_child ("entries", entries);
}
std::string to_string () override
{
boost::property_tree::write_json (sstr, tree);
return sstr.str ();
}
// WARNING: This method moves the ptree out of the object, leaving it in an undefined state
boost::property_tree::ptree && to_ptree ()
{
return std::move (tree);
}
private:
std::ostringstream sstr;
};
/** File sink with rotation support. This writes one counter per line and does not include histogram values. */
class stat_file_writer : public nano::stat_log_sink
{
public:
std::ofstream log;
std::string filename;
explicit stat_file_writer (std::string const & filename) :
filename (filename)
{
log.open (filename.c_str (), std::ofstream::out);
}
~stat_file_writer () override
{
log.close ();
}
std::ostream & out () override
{
return log;
}
void write_header (std::string const & header, std::chrono::system_clock::time_point & walltime) override
{
std::time_t now = std::chrono::system_clock::to_time_t (walltime);
tm tm = *localtime (&now);
log << header << "," << boost::format ("%04d.%02d.%02d %02d:%02d:%02d") % (1900 + tm.tm_year) % (tm.tm_mon + 1) % tm.tm_mday % tm.tm_hour % tm.tm_min % tm.tm_sec << std::endl;
}
void write_counter_entry (tm & tm, std::string const & type, std::string const & detail, std::string const & dir, uint64_t value) override
{
log << boost::format ("%02d:%02d:%02d") % tm.tm_hour % tm.tm_min % tm.tm_sec << "," << type << "," << detail << "," << dir << "," << value << std::endl;
}
void write_sampler_entry (tm & tm, const std::string & sample, const std::vector<stats::sampler_value_t> & values, std::pair<stats::sampler_value_t, stats::sampler_value_t> expected_min_max) override
{
log << boost::format ("%02d:%02d:%02d") % tm.tm_hour % tm.tm_min % tm.tm_sec << "," << sample;
for (const auto & value : values)
{
log << "," << value;
}
log << std::endl;
}
void rotate () override
{
log.close ();
log.open (filename.c_str (), std::ofstream::out);
log_entries = 0;
}
};
}

View file

@ -107,6 +107,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::scheduler_priority:
thread_role_name_string = "Sched Priority";
break;
case nano::thread_role::name::stats:
thread_role_name_string = "Stats";
break;
case nano::thread_role::name::rep_crawler:
thread_role_name_string = "Rep Crawler";
break;

View file

@ -52,6 +52,7 @@ enum class name
tcp_listener,
peer_history,
port_mapping,
stats,
};
std::string_view to_string (name);

View file

@ -29,6 +29,7 @@ class tomlconfig : public nano::configbase
public:
tomlconfig ();
tomlconfig (std::shared_ptr<cpptoml::table> const & tree_a, std::shared_ptr<nano::error> const & error_a = nullptr);
void doc (std::string const & key, std::string const & doc);
nano::error & read (std::filesystem::path const & path_a);
nano::error & read (std::istream & stream_overrides, std::filesystem::path const & path_a);

View file

@ -192,26 +192,53 @@ constexpr TARGET_TYPE narrow_cast (SOURCE_TYPE const & val)
// Issue #3748
void sort_options_description (const boost::program_options::options_description & source, boost::program_options::options_description & target);
}
/*
* Clock utilities
*/
namespace nano
{
/**
* Steady clock should always be used for measuring time intervals
*/
using clock = std::chrono::steady_clock;
/**
* Check whether time elapsed between `last` and `now` is greater than `duration`
* Force usage of steady clock
*/
template <typename Duration>
bool elapsed (nano::clock::time_point const & last, Duration duration, nano::clock::time_point const & now)
bool elapsed (nano::clock::time_point const & last, Duration const & duration, nano::clock::time_point const & now)
{
return last + duration < now;
}
/**
* Check whether time elapsed since `last` is greater than `duration`
* Force usage of steady clock
*/
template <typename Duration>
bool elapsed (nano::clock::time_point const & last, Duration duration)
bool elapsed (nano::clock::time_point const & last, Duration const & duration)
{
return elapsed (last, duration, nano::clock::now ());
}
/**
* Check whether time elapsed since `last` is greater than `duration` and update `last` if true
* Force usage of steady clock
*/
template <typename Duration>
bool elapse (nano::clock::time_point & last, Duration const & duration)
{
auto now = nano::clock::now ();
if (last + duration < now)
{
last = now;
return true;
}
return false;
}
}
namespace nano::util

View file

@ -287,6 +287,8 @@ void nano::active_transactions::cleanup_election (nano::unique_lock<nano::mutex>
lock_a.unlock ();
node.stats.sample (nano::stat::sample::active_election_duration, { 0, 1000 * 60 * 10 /* 0-10 minutes range */ }, election->duration ().count ());
vacancy_update ();
for (auto const & [hash, block] : blocks_l)

View file

@ -345,6 +345,10 @@ void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & mes
auto iterator = tags_by_id.find (message.id);
auto tag = *iterator;
tags_by_id.erase (iterator);
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply);
stats.sample (nano::stat::sample::bootstrap_tag_duration, { 0, config.bootstrap_ascending.timeout }, nano::milliseconds_since_epoch () - tag.time);
scoring.received_message (channel);
lock.unlock ();
@ -362,7 +366,7 @@ void nano::bootstrap_ascending::service::process (nano::asc_pull_ack const & mes
void nano::bootstrap_ascending::service::process (const nano::asc_pull_ack::blocks_payload & response, const nano::bootstrap_ascending::service::async_tag & tag)
{
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::reply);
stats.inc (nano::stat::type::bootstrap_ascending, nano::stat::detail::process);
auto result = verify (response, tag);
switch (result)

View file

@ -560,6 +560,11 @@ std::shared_ptr<nano::block> nano::election::winner () const
return status.winner;
}
std::chrono::milliseconds nano::election::duration () const
{
return std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::steady_clock::now () - election_start);
}
void nano::election::broadcast_vote_locked (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());

View file

@ -88,6 +88,7 @@ public: // Status
bool failed () const;
nano::election_extended_status current_status () const;
std::shared_ptr<nano::block> winner () const;
std::chrono::milliseconds duration () const;
std::atomic<unsigned> confirmation_request_count{ 0 };
nano::tally_t tally () const;
@ -170,7 +171,7 @@ private:
mutable std::unordered_map<nano::block_hash, nano::uint128_t> last_tally;
nano::election_behavior const behavior_m{ nano::election_behavior::normal };
std::chrono::steady_clock::time_point const election_start = { std::chrono::steady_clock::now () };
std::chrono::steady_clock::time_point const election_start{ std::chrono::steady_clock::now () };
mutable nano::mutex mutex;

View file

@ -1,6 +1,7 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/config.hpp>
#include <nano/lib/json_error_response.hpp>
#include <nano/lib/stats_sinks.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/bootstrap/bootstrap_lazy.hpp>
@ -3963,23 +3964,30 @@ void nano::json_handler::sign ()
void nano::json_handler::stats ()
{
auto sink = node.stats.log_sink_json ();
std::string type (request.get<std::string> ("type", ""));
bool use_sink = false;
auto respond_with_sink = [this] (auto & sink) {
auto stat_ptree = sink.to_ptree ();
stat_ptree.put ("stat_duration_seconds", node.stats.last_reset ().count ());
response_l = stat_ptree;
};
if (type == "counters")
{
node.stats.log_counters (*sink);
use_sink = true;
nano::stat_json_writer sink;
node.stats.log_counters (sink);
respond_with_sink (sink);
}
else if (type == "samples")
{
nano::stat_json_writer sink;
node.stats.log_samples (sink);
respond_with_sink (sink);
}
else if (type == "objects")
{
construct_json (collect_container_info (node, "node").get (), response_l);
}
else if (type == "samples")
{
node.stats.log_samples (*sink);
use_sink = true;
}
else if (type == "database")
{
node.store.serialize_memory_stats (response_l);
@ -3988,18 +3996,8 @@ void nano::json_handler::stats ()
{
ec = nano::error_rpc::invalid_missing_type;
}
if (!ec && use_sink)
{
auto stat_tree_l (*static_cast<boost::property_tree::ptree *> (sink->to_object ()));
stat_tree_l.put ("stat_duration_seconds", node.stats.last_reset ().count ());
std::stringstream ostream;
boost::property_tree::write_json (ostream, stat_tree_l);
response (ostream.str ());
}
else
{
response_errors ();
}
response_errors ();
}
void nano::json_handler::stats_clear ()

View file

@ -706,6 +706,7 @@ void nano::node::start ()
}
websocket.start ();
telemetry.start ();
stats.start ();
local_block_broadcaster.start ();
peer_history.start ();

View file

@ -22,7 +22,7 @@ void nano::transport::channel::send (nano::message & message_a, std::function<vo
bool should_pass = node.outbound_limiter.should_pass (buffer.size (), to_bandwidth_limit_type (traffic_type));
bool pass = !is_droppable_by_limiter || should_pass;
node.stats.inc (pass ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message_a.type ()), nano::stat::dir::out);
node.stats.inc (pass ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message_a.type ()), nano::stat::dir::out, /* aggregate all */ true);
node.logger.trace (nano::log::type::channel_sent, to_log_detail (message_a.type ()),
nano::log::arg{ "message", message_a },
nano::log::arg{ "channel", *this },

View file

@ -121,7 +121,7 @@ void nano::transport::socket::async_read (std::shared_ptr<std::vector<uint8_t>>
}
else
{
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::in, size_a);
this_l->set_last_completion ();
this_l->set_last_receive_time ();
}
@ -214,7 +214,7 @@ void nano::transport::socket::write_queued_messages ()
}
else
{
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size);
node_l->stats.add (nano::stat::type::traffic_tcp, nano::stat::detail::all, nano::stat::dir::out, size, /* aggregate all */ true);
this_l->set_last_completion ();
}

View file

@ -1,5 +1,6 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/config.hpp>
#include <nano/lib/stats_sinks.hpp>
#include <nano/node/election_status.hpp>
#include <nano/node/vote_with_weight_info.hpp>
#include <nano/qt/qt.hpp>
@ -857,13 +858,13 @@ void nano_qt::stats_viewer::refresh_stats ()
{
model->removeRows (0, model->rowCount ());
auto sink = wallet.node.stats.log_sink_json ();
wallet.node.stats.log_counters (*sink);
auto json = static_cast<boost::property_tree::ptree *> (sink->to_object ());
if (json)
nano::stat_json_writer sink;
wallet.node.stats.log_counters (sink);
auto json = sink.to_ptree ();
if (!json.empty ())
{
// Format the stat data to make totals and values easier to read
for (boost::property_tree::ptree::value_type const & child : json->get_child ("entries"))
for (boost::property_tree::ptree::value_type const & child : json.get_child ("entries"))
{
auto time = child.second.get<std::string> ("time");
auto type = child.second.get<std::string> ("type");

View file

@ -32,6 +32,7 @@
#include <algorithm>
#include <map>
#include <ranges>
#include <tuple>
#include <utility>
@ -5351,14 +5352,14 @@ TEST (rpc, stats_clear)
auto node = add_ipc_enabled_node (system);
auto const rpc_ctx = add_rpc (system, node);
nano::keypair key;
node->stats.inc (nano::stat::type::ledger, nano::stat::dir::in);
ASSERT_EQ (1, node->stats.count (nano::stat::type::ledger, nano::stat::dir::in));
node->stats.inc (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in);
ASSERT_EQ (1, node->stats.count (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in));
boost::property_tree::ptree request;
request.put ("action", "stats_clear");
auto response (wait_response (system, rpc_ctx, request));
std::string success (response.get<std::string> ("success"));
ASSERT_TRUE (success.empty ());
ASSERT_EQ (0, node->stats.count (nano::stat::type::ledger, nano::stat::dir::in));
ASSERT_EQ (0, node->stats.count (nano::stat::type::ledger, nano::stat::detail::test, nano::stat::dir::in));
ASSERT_LE (node->stats.last_reset ().count (), 5);
}
@ -5752,6 +5753,56 @@ TEST (rpc, memory_stats)
}
}
TEST (rpc, stats_samples)
{
nano::test::system system;
auto node = add_ipc_enabled_node (system);
auto const rpc_ctx = add_rpc (system, node);
node->stats.sample (nano::stat::sample::active_election_duration, { 0, 10 }, 1);
node->stats.sample (nano::stat::sample::active_election_duration, { 0, 10 }, 2);
node->stats.sample (nano::stat::sample::active_election_duration, { 0, 10 }, 3);
node->stats.sample (nano::stat::sample::active_election_duration, { 0, 10 }, 4);
node->stats.sample (nano::stat::sample::bootstrap_tag_duration, { 0, 999 }, 5);
node->stats.sample (nano::stat::sample::bootstrap_tag_duration, { 0, 999 }, 5);
boost::property_tree::ptree request;
request.put ("action", "stats");
request.put ("type", "samples");
auto response (wait_response (system, rpc_ctx, request));
std::vector<boost::property_tree::ptree> entries;
for (auto & entry : response.get_child ("entries"))
{
entries.push_back (entry.second);
}
ASSERT_EQ (entries.size (), 2);
{
auto entry = entries[0];
ASSERT_EQ ("active_election_duration", entry.get<std::string> ("sample"));
ASSERT_EQ ("0", entry.get<std::string> ("min"));
ASSERT_EQ ("10", entry.get<std::string> ("max"));
std::vector<std::string> expected_values = { "1", "2", "3", "4" };
auto values = entry.get_child ("values") | std::views::transform ([] (auto const & v) { return v.second.template get_value<std::string> (); });
ASSERT_TRUE (std::ranges::equal (expected_values, values));
}
{
auto entry = entries[1];
ASSERT_EQ ("bootstrap_tag_duration", entry.get<std::string> ("sample"));
ASSERT_EQ ("0", entry.get<std::string> ("min"));
ASSERT_EQ ("999", entry.get<std::string> ("max"));
std::vector<std::string> expected_values = { "5", "5" };
auto values = entry.get_child ("values") | std::views::transform ([] (auto const & v) { return v.second.template get_value<std::string> (); });
ASSERT_TRUE (std::ranges::equal (expected_values, values));
}
}
TEST (rpc, block_confirmed)
{
nano::test::system system;

View file

@ -715,7 +715,7 @@ TEST (confirmation_height, many_accounts_single_confirmation)
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_bounded, nano::stat::dir::in), num_accounts * 2 - 2);
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in), 0);
ASSERT_TIMELY_EQ (40s, (node->ledger.cemented_count () - 1), node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::all, nano::stat::dir::out));
ASSERT_TIMELY_EQ (40s, (node->ledger.cemented_count () - 1), node->stats.count (nano::stat::type::confirmation_observer, nano::stat::dir::out));
ASSERT_TIMELY_EQ (10s, node->active.election_winner_details_size (), 0);
}
@ -778,7 +778,7 @@ TEST (confirmation_height, many_accounts_many_confirmations)
ASSERT_GE (num_confirmed_bounded, nano::confirmation_height::unbounded_cutoff);
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in), num_blocks_to_confirm - num_confirmed_bounded);
ASSERT_TIMELY_EQ (60s, (node->ledger.cemented_count () - 1), node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::all, nano::stat::dir::out));
ASSERT_TIMELY_EQ (60s, (node->ledger.cemented_count () - 1), node->stats.count (nano::stat::type::confirmation_observer, nano::stat::dir::out));
auto transaction = node->store.tx_begin_read ();
size_t cemented_count = 0;
@ -790,7 +790,7 @@ TEST (confirmation_height, many_accounts_many_confirmations)
ASSERT_EQ (num_blocks_to_confirm + 1, cemented_count);
ASSERT_EQ (cemented_count, node->ledger.cemented_count ());
ASSERT_TIMELY_EQ (20s, (node->ledger.cemented_count () - 1), node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::all, nano::stat::dir::out));
ASSERT_TIMELY_EQ (20s, (node->ledger.cemented_count () - 1), node->stats.count (nano::stat::type::confirmation_observer, nano::stat::dir::out));
ASSERT_TIMELY_EQ (10s, node->active.election_winner_details_size (), 0);
}
@ -937,7 +937,7 @@ TEST (confirmation_height, long_chains)
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_bounded, nano::stat::dir::in), num_blocks * 2 + 2);
ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed_unbounded, nano::stat::dir::in), 0);
ASSERT_TIMELY_EQ (40s, (node->ledger.cemented_count () - 1), node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::all, nano::stat::dir::out));
ASSERT_TIMELY_EQ (40s, (node->ledger.cemented_count () - 1), node->stats.count (nano::stat::type::confirmation_observer, nano::stat::dir::out));
ASSERT_TIMELY_EQ (10s, node->active.election_winner_details_size (), 0);
}
@ -1095,7 +1095,7 @@ TEST (confirmation_height, many_accounts_send_receive_self)
}
system.deadline_set (200s);
while ((node->ledger.cemented_count () - 1) != node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::all, nano::stat::dir::out))
while ((node->ledger.cemented_count () - 1) != node->stats.count (nano::stat::type::confirmation_observer, nano::stat::dir::out))
{
ASSERT_NO_ERROR (system.poll ());
}
@ -1111,7 +1111,7 @@ TEST (confirmation_height, many_accounts_send_receive_self)
ASSERT_EQ (cemented_count, node->ledger.cemented_count ());
system.deadline_set (60s);
while ((node->ledger.cemented_count () - 1) != node->stats.count (nano::stat::type::confirmation_observer, nano::stat::detail::all, nano::stat::dir::out))
while ((node->ledger.cemented_count () - 1) != node->stats.count (nano::stat::type::confirmation_observer, nano::stat::dir::out))
{
ASSERT_NO_ERROR (system.poll ());
}

View file

@ -87,6 +87,7 @@ void nano::test::system::stop ()
stop_node (*node);
}
stats.stop ();
io_guard.reset ();
work.stop ();
}