Encapsulate and lock stat_datapoint, fixes core_test tsan (#1665)

This commit is contained in:
cryptocode 2019-01-30 17:50:39 +01:00 committed by GitHub
commit 70532b5268
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 55 additions and 14 deletions

View file

@ -1490,7 +1490,8 @@ void nano::signature_checker::set_thread_names (unsigned num_threads)
for (auto i = 0u; i < num_threads; ++i) for (auto i = 0u; i < num_threads; ++i)
{ {
boost::asio::post (thread_pool, [&cv, &ready, &pending, &mutex_l, &promise = promises[i] ]() { // clang-format off
boost::asio::post (thread_pool, [&cv, &ready, &pending, &mutex_l, &promise = promises[i]]() {
std::unique_lock<std::mutex> lk (mutex_l); std::unique_lock<std::mutex> lk (mutex_l);
nano::thread_role::set (nano::thread_role::name::signature_checking); nano::thread_role::set (nano::thread_role::name::signature_checking);
if (--pending == 0) if (--pending == 0)
@ -1507,6 +1508,7 @@ void nano::signature_checker::set_thread_names (unsigned num_threads)
} }
promise.set_value (); promise.set_value ();
}); });
// clang-format on
} }
// Wait until all threads have finished // Wait until all threads have finished

View file

@ -205,14 +205,14 @@ void nano::stat::log_counters_impl (stat_log_sink & sink)
for (auto & it : entries) for (auto & it : entries)
{ {
std::time_t time = std::chrono::system_clock::to_time_t (it.second->counter.timestamp); std::time_t time = std::chrono::system_clock::to_time_t (it.second->counter.get_timestamp ());
tm local_tm = *localtime (&time); tm local_tm = *localtime (&time);
auto key = it.first; auto key = it.first;
std::string type = type_to_string (key); std::string type = type_to_string (key);
std::string detail = detail_to_string (key); std::string detail = detail_to_string (key);
std::string dir = dir_to_string (key); std::string dir = dir_to_string (key);
sink.write_entry (local_tm, type, detail, dir, it.second->counter.value); sink.write_entry (local_tm, type, detail, dir, it.second->counter.get_value ());
} }
sink.entries ()++; sink.entries ()++;
sink.finalize (); sink.finalize ();
@ -247,9 +247,9 @@ void nano::stat::log_samples_impl (stat_log_sink & sink)
for (auto & datapoint : it.second->samples) for (auto & datapoint : it.second->samples)
{ {
std::time_t time = std::chrono::system_clock::to_time_t (datapoint.timestamp); std::time_t time = std::chrono::system_clock::to_time_t (datapoint.get_timestamp ());
tm local_tm = *localtime (&time); tm local_tm = *localtime (&time);
sink.write_entry (local_tm, type, detail, dir, datapoint.value); sink.write_entry (local_tm, type, detail, dir, datapoint.get_value ());
} }
} }
sink.entries ()++; sink.entries ()++;
@ -267,9 +267,9 @@ void nano::stat::update (uint32_t key_a, uint64_t value)
auto entry (get_entry_impl (key_a, config.interval, config.capacity)); auto entry (get_entry_impl (key_a, config.interval, config.capacity));
// Counters // Counters
auto old (entry->counter.value); auto old (entry->counter.get_value ());
entry->counter.add (value); entry->counter.add (value);
entry->count_observers.notify (old, entry->counter.value); entry->count_observers.notify (old, entry->counter.get_value ());
std::chrono::duration<double, std::milli> duration = now - log_last_count_writeout; std::chrono::duration<double, std::milli> duration = now - log_last_count_writeout;
if (config.log_interval_counters > 0 && duration.count () > config.log_interval_counters) if (config.log_interval_counters > 0 && duration.count () > config.log_interval_counters)
@ -289,9 +289,9 @@ void nano::stat::update (uint32_t key_a, uint64_t value)
entry->sample_start_time = now; entry->sample_start_time = now;
// Make a snapshot of samples for thread safety and to get a stable container // Make a snapshot of samples for thread safety and to get a stable container
entry->sample_current.timestamp = std::chrono::system_clock::now (); entry->sample_current.set_timestamp (std::chrono::system_clock::now ());
entry->samples.push_back (entry->sample_current); entry->samples.push_back (entry->sample_current);
entry->sample_current.value = 0; entry->sample_current.set_value (0);
if (entry->sample_observers.observers.size () > 0) if (entry->sample_observers.observers.size () > 0)
{ {

View file

@ -6,6 +6,7 @@
#include <chrono> #include <chrono>
#include <map> #include <map>
#include <memory> #include <memory>
#include <mutex>
#include <nano/lib/errors.hpp> #include <nano/lib/errors.hpp>
#include <nano/lib/jsonconfig.hpp> #include <nano/lib/jsonconfig.hpp>
#include <nano/lib/utility.hpp> #include <nano/lib/utility.hpp>
@ -59,20 +60,58 @@ public:
class stat_datapoint class stat_datapoint
{ {
public: public:
/** Value of the sample interval */ stat_datapoint () = default;
uint64_t value{ 0 }; stat_datapoint (stat_datapoint const & other_a)
/** When the sample was added. This is wall time (system_clock), suitable for display purposes. */ {
std::chrono::system_clock::time_point timestamp{ std::chrono::system_clock::now () }; std::lock_guard<std::mutex> lock (other_a.datapoint_mutex);
value = other_a.value;
timestamp = other_a.timestamp;
}
stat_datapoint & operator= (stat_datapoint const & other_a)
{
std::lock_guard<std::mutex> lock (other_a.datapoint_mutex);
value = other_a.value;
timestamp = other_a.timestamp;
return *this;
}
uint64_t get_value ()
{
std::lock_guard<std::mutex> lock (datapoint_mutex);
return value;
}
void set_value (uint64_t value_a)
{
std::lock_guard<std::mutex> lock (datapoint_mutex);
value = value_a;
}
std::chrono::system_clock::time_point get_timestamp ()
{
std::lock_guard<std::mutex> lock (datapoint_mutex);
return timestamp;
}
void set_timestamp (std::chrono::system_clock::time_point timestamp_a)
{
std::lock_guard<std::mutex> lock (datapoint_mutex);
timestamp = timestamp_a;
}
/** Add \addend to the current value and optionally update the timestamp */ /** Add \addend to the current value and optionally update the timestamp */
void add (uint64_t addend, bool update_timestamp = true) void add (uint64_t addend, bool update_timestamp = true)
{ {
std::lock_guard<std::mutex> lock (datapoint_mutex);
value += addend; value += addend;
if (update_timestamp) if (update_timestamp)
{ {
timestamp = std::chrono::system_clock::now (); timestamp = std::chrono::system_clock::now ();
} }
} }
private:
mutable std::mutex datapoint_mutex;
/** Value of the sample interval */
uint64_t value{ 0 };
/** When the sample was added. This is wall time (system_clock), suitable for display purposes. */
std::chrono::system_clock::time_point timestamp{ std::chrono::system_clock::now () };
}; };
/** Bookkeeping of statistics for a specific type/detail/direction combination */ /** Bookkeeping of statistics for a specific type/detail/direction combination */
@ -383,7 +422,7 @@ public:
/** Returns current value for the given counter at the detail level */ /** Returns current value for the given counter at the detail level */
uint64_t count (stat::type type, stat::detail detail, stat::dir dir = stat::dir::in) uint64_t count (stat::type type, stat::detail detail, stat::dir dir = stat::dir::in)
{ {
return get_entry (key_of (type, detail, dir))->counter.value; return get_entry (key_of (type, detail, dir))->counter.get_value ();
} }
/** Returns the number of seconds since clear() was last called, or node startup if it's never called. */ /** Returns the number of seconds since clear() was last called, or node startup if it's never called. */