No write transactions on I/O threads (#1264)

* Remove write transactions from io threads

* TSAN error, make sure thread is initialized last
This commit is contained in:
Roy Keene 2019-08-23 11:32:58 -05:00 committed by Wesley Shillingford
commit 0dc9d4fa53
10 changed files with 975 additions and 557 deletions

View file

@ -28,6 +28,7 @@ add_executable (core_test
socket.cpp
timer.cpp
uint256_union.cpp
utility.cpp
versioning.cpp
wallet.cpp
wallets.cpp

View file

@ -0,0 +1,34 @@
#include <nano/lib/timer.hpp>
#include <nano/lib/utility.hpp>
#include <nano/secure/utility.hpp>
#include <gtest/gtest.h>
namespace
{
std::atomic<bool> passed_sleep{ false };
void func ()
{
std::this_thread::sleep_for (std::chrono::seconds (1));
passed_sleep = true;
}
}
TEST (thread, worker)
{
nano::worker worker;
worker.push_task (func);
ASSERT_FALSE (passed_sleep);
nano::timer<std::chrono::milliseconds> timer_l;
timer_l.start ();
while (!passed_sleep)
{
if (timer_l.since_start () > std::chrono::seconds (10))
{
break;
}
}
ASSERT_TRUE (passed_sleep);
}

View file

@ -956,7 +956,7 @@ TEST (wallet, password_race)
nano::system system (24000, 1);
nano::thread_runner runner (system.io_ctx, system.nodes[0]->config.io_threads);
auto wallet = system.wallet (0);
system.nodes[0]->background ([&wallet]() {
std::thread thread ([&wallet]() {
for (int i = 0; i < 100; i++)
{
auto transaction (wallet->wallets.tx_begin_write ());
@ -974,6 +974,7 @@ TEST (wallet, password_race)
break;
}
}
thread.join ();
system.stop ();
runner.join ();
}
@ -990,23 +991,24 @@ TEST (wallet, password_race_corrupt_seed)
wallet->store.seed (seed, transaction);
ASSERT_FALSE (wallet->store.attempt_password (transaction, "4567"));
}
std::vector<std::thread> threads;
for (int i = 0; i < 100; i++)
{
system.nodes[0]->background ([&wallet]() {
threads.emplace_back ([&wallet]() {
for (int i = 0; i < 10; i++)
{
auto transaction (wallet->wallets.tx_begin_write ());
wallet->store.rekey (transaction, "0000");
}
});
system.nodes[0]->background ([&wallet]() {
threads.emplace_back ([&wallet]() {
for (int i = 0; i < 10; i++)
{
auto transaction (wallet->wallets.tx_begin_write ());
wallet->store.rekey (transaction, "1234");
}
});
system.nodes[0]->background ([&wallet]() {
threads.emplace_back ([&wallet]() {
for (int i = 0; i < 10; i++)
{
auto transaction (wallet->wallets.tx_begin_read ());
@ -1014,6 +1016,10 @@ TEST (wallet, password_race_corrupt_seed)
}
});
}
for (auto & thread : threads)
{
thread.join ();
}
system.stop ();
runner.join ();
{

View file

@ -134,6 +134,9 @@ namespace thread_role
case nano::thread_role::name::confirmation_height_processing:
thread_role_name_string = "Conf height";
break;
case nano::thread_role::name::worker:
thread_role_name_string = "Worker";
break;
}
/*
@ -225,6 +228,76 @@ void nano::thread_runner::stop_event_processing ()
io_guard.get_executor ().context ().stop ();
}
nano::worker::worker () :
thread ([this]() {
nano::thread_role::set (nano::thread_role::name::worker);
this->run ();
})
{
}
void nano::worker::run ()
{
while (!stopped)
{
std::unique_lock<std::mutex> lk (mutex);
if (!queue.empty ())
{
auto func = queue.front ();
queue.pop_front ();
lk.unlock ();
func ();
// So that we reduce locking for anything being pushed as that will
// most likely be on an io-thread
std::this_thread::yield ();
lk.lock ();
}
else
{
cv.wait (lk);
}
}
}
nano::worker::~worker ()
{
stop ();
}
void nano::worker::push_task (std::function<void()> func_a)
{
{
std::lock_guard<std::mutex> guard (mutex);
queue.emplace_back (func_a);
}
cv.notify_one ();
}
void nano::worker::stop ()
{
stopped = true;
cv.notify_one ();
if (thread.joinable ())
{
thread.join ();
}
}
std::unique_ptr<nano::seq_con_info_component> nano::collect_seq_con_info (nano::worker & worker, const std::string & name)
{
auto composite = std::make_unique<seq_con_info_composite> (name);
size_t count = 0;
{
std::lock_guard<std::mutex> guard (worker.mutex);
count = worker.queue.size ();
}
auto sizeof_element = sizeof (decltype (worker.queue)::value_type);
composite->add_component (std::make_unique<nano::seq_con_info_leaf> (nano::seq_con_info{ "queue", count, sizeof_element }));
return composite;
}
/*
* Backing code for "release_assert", which is itself a macro
*/

View file

@ -110,7 +110,8 @@ namespace thread_role
rpc_request_processor,
rpc_process_container,
work_watcher,
confirmation_height_processing
confirmation_height_processing,
worker
};
/*
* Get/Set the identifier for the current thread
@ -152,6 +153,27 @@ public:
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> io_guard;
};
class worker final
{
public:
worker ();
~worker ();
void run ();
void push_task (std::function<void()> func);
void stop ();
private:
std::condition_variable cv;
std::deque<std::function<void()>> queue;
std::mutex mutex;
std::atomic<bool> stopped{ false };
std::thread thread;
friend std::unique_ptr<seq_con_info_component> collect_seq_con_info (worker &, const std::string &);
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (worker & worker, const std::string & name);
/**
* Returns seconds passed since unix epoch (posix time)
*/

File diff suppressed because it is too large Load diff

View file

@ -584,6 +584,7 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (node & node, const
composite->add_component (collect_seq_con_info (node.vote_uniquer, "vote_uniquer"));
composite->add_component (collect_seq_con_info (node.confirmation_height_processor, "confirmation_height_processor"));
composite->add_component (collect_seq_con_info (node.pending_confirmation_height, "pending_confirmation_height"));
composite->add_component (collect_seq_con_info (node.worker, "worker"));
return composite;
}
}
@ -679,6 +680,7 @@ void nano::node::stop ()
wallets.stop ();
stats.stop ();
write_database_queue.stop ();
worker.stop ();
}
}
@ -790,7 +792,9 @@ void nano::node::ongoing_store_flush ()
alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_store_flush ();
node_l->worker.push_task ([node_l]() {
node_l->ongoing_store_flush ();
});
}
});
}
@ -803,7 +807,9 @@ void nano::node::ongoing_peer_store ()
alarm.add (std::chrono::steady_clock::now () + network_params.node.peer_interval, [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_peer_store ();
node_l->worker.push_task ([node_l]() {
node_l->ongoing_peer_store ();
});
}
});
}
@ -834,7 +840,9 @@ void nano::node::search_pending ()
wallets.search_pending_all ();
auto this_l (shared ());
alarm.add (std::chrono::steady_clock::now () + network_params.node.search_pending_interval, [this_l]() {
this_l->search_pending ();
this_l->worker.push_task ([this_l]() {
this_l->search_pending ();
});
});
}
@ -898,7 +906,9 @@ void nano::node::ongoing_unchecked_cleanup ()
}
auto this_l (shared ());
alarm.add (std::chrono::steady_clock::now () + network_params.node.unchecked_cleaning_interval, [this_l]() {
this_l->ongoing_unchecked_cleanup ();
this_l->worker.push_task ([this_l]() {
this_l->ongoing_unchecked_cleanup ();
});
});
}
@ -1292,7 +1302,9 @@ void nano::node::ongoing_online_weight_calculation_queue ()
alarm.add (std::chrono::steady_clock::now () + (std::chrono::seconds (network_params.node.weight_period)), [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_online_weight_calculation ();
node_l->worker.push_task ([node_l]() {
node_l->ongoing_online_weight_calculation ();
});
}
});
}

View file

@ -141,6 +141,7 @@ public:
void ongoing_online_weight_calculation ();
void ongoing_online_weight_calculation_queue ();
bool online () const;
nano::worker worker;
nano::write_database_queue write_database_queue;
boost::asio::io_context & io_ctx;
boost::latch node_initialized_latch;

File diff suppressed because it is too large Load diff

View file

@ -394,6 +394,10 @@ void nano::read_transaction::refresh () const
nano::write_transaction::write_transaction (std::unique_ptr<nano::write_transaction_impl> write_transaction_impl) :
impl (std::move (write_transaction_impl))
{
/*
* For IO threads, we do not want them to block on creating write transactions.
*/
assert (nano::thread_role::get () != nano::thread_role::name::io);
}
void * nano::write_transaction::get_handle () const