Merge pull request #4762 from pwojcikdev/thread-pool-work

Rework `nano::thread_pool`
This commit is contained in:
Piotr Wójcik 2024-10-27 12:24:49 +01:00 committed by GitHub
commit bdf4668039
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 327 additions and 300 deletions

View file

@ -53,6 +53,7 @@ add_executable(
socket.cpp
system.cpp
telemetry.cpp
thread_pool.cpp
throttle.cpp
toml.cpp
timer.cpp

View file

@ -0,0 +1,96 @@
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/timer.hpp>
#include <nano/test_common/testutil.hpp>
#include <gtest/gtest.h>
#include <future>
TEST (thread_pool, thread_pool)
{
std::atomic<bool> passed_sleep{ false };
auto func = [&passed_sleep] () {
std::this_thread::sleep_for (std::chrono::seconds (1));
passed_sleep = true;
};
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
nano::test::start_stop_guard stop_guard{ workers };
workers.post (func);
ASSERT_FALSE (passed_sleep);
nano::timer<std::chrono::milliseconds> timer_l;
timer_l.start ();
while (!passed_sleep)
{
if (timer_l.since_start () > std::chrono::seconds (10))
{
break;
}
}
ASSERT_TRUE (passed_sleep);
}
TEST (thread_pool, one)
{
std::atomic<bool> done (false);
nano::mutex mutex;
nano::condition_variable condition;
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
nano::test::start_stop_guard stop_guard{ workers };
workers.post ([&] () {
{
nano::lock_guard<nano::mutex> lock{ mutex };
done = true;
}
condition.notify_one ();
});
nano::unique_lock<nano::mutex> unique{ mutex };
condition.wait (unique, [&] () { return !!done; });
}
TEST (thread_pool, many)
{
std::atomic<int> count (0);
nano::mutex mutex;
nano::condition_variable condition;
nano::thread_pool workers (50u, nano::thread_role::name::unknown);
nano::test::start_stop_guard stop_guard{ workers };
for (auto i (0); i < 50; ++i)
{
workers.post ([&] () {
{
nano::lock_guard<nano::mutex> lock{ mutex };
count += 1;
}
condition.notify_one ();
});
}
nano::unique_lock<nano::mutex> unique{ mutex };
condition.wait (unique, [&] () { return count == 50; });
}
TEST (thread_pool, top_execution)
{
int value1 (0);
int value2 (0);
nano::mutex mutex;
std::promise<bool> promise;
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
nano::test::start_stop_guard stop_guard{ workers };
workers.post ([&] () {
nano::lock_guard<nano::mutex> lock{ mutex };
value1 = 1;
value2 = 1;
});
workers.post_delayed (std::chrono::milliseconds (1), [&] () {
nano::lock_guard<nano::mutex> lock{ mutex };
value2 = 2;
promise.set_value (false);
});
promise.get_future ().get ();
nano::lock_guard<nano::mutex> lock{ mutex };
ASSERT_EQ (1, value1);
ASSERT_EQ (2, value2);
}

View file

@ -1,6 +1,6 @@
#include <nano/lib/optional_ptr.hpp>
#include <nano/lib/rate_limiting.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/relaxed_atomic.hpp>
#include <nano/lib/timer.hpp>
#include <nano/lib/utility.hpp>
#include <nano/secure/pending_info.hpp>
@ -146,91 +146,6 @@ TEST (optional_ptr, basic)
ASSERT_EQ (opt->z, 3);
}
TEST (thread, thread_pool)
{
std::atomic<bool> passed_sleep{ false };
auto func = [&passed_sleep] () {
std::this_thread::sleep_for (std::chrono::seconds (1));
passed_sleep = true;
};
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
workers.push_task (func);
ASSERT_FALSE (passed_sleep);
nano::timer<std::chrono::milliseconds> timer_l;
timer_l.start ();
while (!passed_sleep)
{
if (timer_l.since_start () > std::chrono::seconds (10))
{
break;
}
}
ASSERT_TRUE (passed_sleep);
}
TEST (thread_pool_alarm, one)
{
std::atomic<bool> done (false);
nano::mutex mutex;
nano::condition_variable condition;
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
workers.add_timed_task (std::chrono::steady_clock::now (), [&] () {
{
nano::lock_guard<nano::mutex> lock{ mutex };
done = true;
}
condition.notify_one ();
});
nano::unique_lock<nano::mutex> unique{ mutex };
condition.wait (unique, [&] () { return !!done; });
}
TEST (thread_pool_alarm, many)
{
std::atomic<int> count (0);
nano::mutex mutex;
nano::condition_variable condition;
nano::thread_pool workers (50u, nano::thread_role::name::unknown);
for (auto i (0); i < 50; ++i)
{
workers.add_timed_task (std::chrono::steady_clock::now (), [&] () {
{
nano::lock_guard<nano::mutex> lock{ mutex };
count += 1;
}
condition.notify_one ();
});
}
nano::unique_lock<nano::mutex> unique{ mutex };
condition.wait (unique, [&] () { return count == 50; });
}
TEST (thread_pool_alarm, top_execution)
{
int value1 (0);
int value2 (0);
nano::mutex mutex;
std::promise<bool> promise;
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
workers.add_timed_task (std::chrono::steady_clock::now (), [&] () {
nano::lock_guard<nano::mutex> lock{ mutex };
value1 = 1;
value2 = 1;
});
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [&] () {
nano::lock_guard<nano::mutex> lock{ mutex };
value2 = 2;
promise.set_value (false);
});
promise.get_future ().get ();
nano::lock_guard<nano::mutex> lock{ mutex };
ASSERT_EQ (1, value1);
ASSERT_EQ (2, value2);
}
TEST (filesystem, remove_all_files)
{
auto path = nano::unique_path ();

View file

@ -94,7 +94,6 @@ add_library(
stats_sinks.hpp
stream.hpp
thread_pool.hpp
thread_pool.cpp
thread_roles.hpp
thread_roles.cpp
thread_runner.hpp

View file

@ -1,97 +0,0 @@
#include <nano/lib/thread_pool.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/thread_pool.hpp>
/*
* thread_pool
*/
nano::thread_pool::thread_pool (unsigned num_threads, nano::thread_role::name thread_name) :
num_threads (num_threads),
thread_pool_m (std::make_unique<boost::asio::thread_pool> (num_threads)),
thread_names_latch{ num_threads }
{
set_thread_names (thread_name);
}
nano::thread_pool::~thread_pool ()
{
stop ();
}
void nano::thread_pool::stop ()
{
nano::unique_lock<nano::mutex> lk (mutex);
if (!stopped)
{
stopped = true;
#if defined(BOOST_ASIO_HAS_IOCP)
// A hack needed for Windows to prevent deadlock during destruction, described here: https://github.com/chriskohlhoff/asio/issues/431
boost::asio::use_service<boost::asio::detail::win_iocp_io_context> (*thread_pool_m).stop ();
#endif
lk.unlock ();
thread_pool_m->stop ();
thread_pool_m->join ();
lk.lock ();
thread_pool_m = nullptr;
}
}
void nano::thread_pool::push_task (std::function<void ()> task)
{
++num_tasks;
nano::lock_guard<nano::mutex> guard (mutex);
if (!stopped)
{
boost::asio::post (*thread_pool_m, [this, task] () {
task ();
--num_tasks;
});
}
}
void nano::thread_pool::add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function<void ()> task)
{
nano::lock_guard<nano::mutex> guard (mutex);
if (!stopped && thread_pool_m)
{
auto timer = std::make_shared<boost::asio::steady_timer> (thread_pool_m->get_executor (), expiry_time);
timer->async_wait ([this, task, timer] (boost::system::error_code const & ec) {
if (!ec)
{
push_task (task);
}
});
}
}
unsigned nano::thread_pool::get_num_threads () const
{
return num_threads;
}
uint64_t nano::thread_pool::num_queued_tasks () const
{
return num_tasks;
}
void nano::thread_pool::set_thread_names (nano::thread_role::name thread_name)
{
for (auto i = 0u; i < num_threads; ++i)
{
boost::asio::post (*thread_pool_m, [this, thread_name] () {
nano::thread_role::set (thread_name);
thread_names_latch.arrive_and_wait ();
});
}
thread_names_latch.wait ();
}
nano::container_info nano::thread_pool::container_info () const
{
nano::container_info info;
info.put ("count", num_queued_tasks ());
return info;
}

View file

@ -4,50 +4,151 @@
#include <nano/lib/thread_roles.hpp>
#include <nano/lib/threading.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/thread_pool.hpp>
#include <atomic>
#include <chrono>
#include <functional>
#include <latch>
namespace boost::asio
{
class thread_pool;
}
#include <memory>
#include <type_traits>
namespace nano
{
class thread_pool final
{
public:
explicit thread_pool (unsigned num_threads, nano::thread_role::name);
~thread_pool ();
// TODO: Auto start should be removed once the node is refactored to start the thread pool explicitly
thread_pool (unsigned num_threads, nano::thread_role::name thread_name, bool auto_start = false) :
num_threads{ num_threads },
thread_name{ thread_name },
thread_names_latch{ num_threads }
{
if (auto_start)
{
start ();
}
}
/** This will run when there is an available thread for execution */
void push_task (std::function<void ()>);
~thread_pool ()
{
// Must be stopped before destruction to avoid running takss when node components are being destroyed
debug_assert (!thread_pool_impl);
}
/** Run a task at a certain point in time */
void add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function<void ()> task);
void start ()
{
debug_assert (!stopped);
debug_assert (!thread_pool_impl);
thread_pool_impl = std::make_unique<boost::asio::thread_pool> (num_threads);
set_thread_names ();
}
/** Stops any further pushed tasks from executing */
void stop ();
void stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
if (!stopped && thread_pool_impl)
{
stopped = true;
/** Number of threads in the thread pool */
unsigned get_num_threads () const;
// TODO: Is this still needed?
#if defined(BOOST_ASIO_HAS_IOCP)
// A hack needed for Windows to prevent deadlock during destruction, described here: https://github.com/chriskohlhoff/asio/issues/431
boost::asio::use_service<boost::asio::detail::win_iocp_io_context> (*thread_pool_impl).stop ();
#endif
/** Returns the number of tasks which are awaiting execution by the thread pool **/
uint64_t num_queued_tasks () const;
lock.unlock ();
nano::container_info container_info () const;
thread_pool_impl->stop ();
thread_pool_impl->join ();
lock.lock ();
thread_pool_impl = nullptr;
}
}
template <typename F>
void post (F && task)
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (!stopped)
{
++num_tasks;
release_assert (thread_pool_impl);
boost::asio::post (*thread_pool_impl, [this, t = std::forward<F> (task)] () mutable {
t ();
--num_tasks;
});
}
}
template <typename F>
void post_delayed (std::chrono::steady_clock::duration const & delay, F && task)
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (!stopped)
{
++num_delayed;
release_assert (thread_pool_impl);
auto timer = std::make_shared<boost::asio::steady_timer> (thread_pool_impl->get_executor ());
timer->expires_after (delay);
timer->async_wait ([this, t = std::forward<F> (task), /* preserve lifetime */ timer] (boost::system::error_code const & ec) mutable {
if (!ec)
{
--num_delayed;
post (std::move (t));
}
});
}
}
bool alive () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return thread_pool_impl != nullptr;
}
uint64_t queued_tasks () const
{
return num_tasks;
}
uint64_t delayed_tasks () const
{
return num_delayed;
}
nano::container_info container_info () const
{
nano::container_info info;
info.put ("tasks", num_tasks);
info.put ("delayed", num_delayed);
return info;
}
private:
nano::mutex mutex;
std::atomic<bool> stopped{ false };
unsigned num_threads;
std::unique_ptr<boost::asio::thread_pool> thread_pool_m;
nano::relaxed_atomic_integral<uint64_t> num_tasks{ 0 };
void set_thread_names ()
{
for (auto i = 0u; i < num_threads; ++i)
{
boost::asio::post (*thread_pool_impl, [this] () {
nano::thread_role::set (thread_name);
thread_names_latch.arrive_and_wait ();
});
}
thread_names_latch.wait ();
}
private:
unsigned const num_threads;
nano::thread_role::name const thread_name;
/** Set the names of all the threads in the thread pool for easier identification */
std::latch thread_names_latch;
void set_thread_names (nano::thread_role::name thread_name);
mutable nano::mutex mutex;
std::atomic<bool> stopped{ false };
std::unique_ptr<boost::asio::thread_pool> thread_pool_impl;
std::atomic<uint64_t> num_tasks{ 0 };
std::atomic<uint64_t> num_delayed{ 0 };
};
} // namespace nano
}

View file

@ -127,7 +127,7 @@ void nano::bulk_pull_client::throttled_receive_block ()
else
{
auto this_l (shared_from_this ());
node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l] () {
node->workers.post_delayed (std::chrono::seconds (1), [this_l] () {
if (!this_l->connection->pending_stop && !this_l->attempt->stopped)
{
this_l->throttled_receive_block ();
@ -530,7 +530,7 @@ void nano::bulk_pull_server::sent_action (boost::system::error_code const & ec,
}
if (!ec)
{
node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () {
node->bootstrap_workers.post ([this_l = shared_from_this ()] () {
this_l->send_next ();
});
}
@ -816,7 +816,7 @@ void nano::bulk_pull_account_server::sent_action (boost::system::error_code cons
}
if (!ec)
{
node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () {
node->bootstrap_workers.post ([this_l = shared_from_this ()] () {
this_l->send_next_block ();
});
}

View file

@ -144,7 +144,7 @@ void nano::bulk_push_server::throttled_receive ()
else
{
auto this_l (shared_from_this ());
node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l] () {
node->workers.post_delayed (std::chrono::seconds (1), [this_l] () {
if (!this_l->connection->stopped)
{
this_l->throttled_receive ();

View file

@ -306,7 +306,7 @@ void nano::bootstrap_connections::populate_connections (bool repeat)
if (!stopped && repeat)
{
std::weak_ptr<nano::bootstrap_connections> this_w (shared_from_this ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_w] () {
node.workers.post_delayed (std::chrono::seconds (1), [this_w] () {
if (auto this_l = this_w.lock ())
{
this_l->populate_connections ();

View file

@ -70,7 +70,7 @@ void nano::frontier_req_client::receive_frontier ()
// we simply get a size of 0.
if (size_a == nano::frontier_req_client::size_frontier)
{
node->bootstrap_workers.push_task ([this_l, ec, size_a] () {
node->bootstrap_workers.post ([this_l, ec, size_a] () {
this_l->received_frontier (ec, size_a);
});
}
@ -355,7 +355,7 @@ void nano::frontier_req_server::sent_action (boost::system::error_code const & e
{
count++;
node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () {
node->bootstrap_workers.post ([this_l = shared_from_this ()] () {
this_l->send_next ();
});
}

View file

@ -55,6 +55,8 @@ void nano::confirming_set::start ()
return;
}
notification_workers.start ();
thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::confirmation_height);
run ();
@ -148,7 +150,7 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
std::unique_lock lock{ mutex };
// It's possible that ledger cementing happens faster than the notifications can be processed by other components, cooldown here
while (notification_workers.num_queued_tasks () >= config.max_queued_notifications)
while (notification_workers.queued_tasks () >= config.max_queued_notifications)
{
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cooldown);
condition.wait_for (lock, 100ms, [this] { return stopped.load (); });
@ -158,7 +160,7 @@ void nano::confirming_set::run_batch (std::unique_lock<std::mutex> & lock)
}
}
notification_workers.push_task ([this, batch = std::move (batch)] () {
notification_workers.post ([this, batch = std::move (batch)] () {
stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify);
batch_cemented.notify (batch);
});

View file

@ -401,10 +401,9 @@ void nano::distributed_work::handle_failure ()
status = work_generation_status::failure_peers;
auto now (std::chrono::steady_clock::now ());
std::weak_ptr<nano::node> node_weak (node.shared ());
auto next_backoff (std::min (backoff * 2, std::chrono::seconds (5 * 60)));
node.workers.add_timed_task (now + std::chrono::seconds (backoff), [node_weak, request_l = request, next_backoff] {
node.workers.post_delayed (std::chrono::seconds (backoff), [node_weak, request_l = request, next_backoff] {
bool error_l{ true };
if (auto node_l = node_weak.lock ())
{

View file

@ -62,7 +62,7 @@ void nano::election::confirm_once (nano::unique_lock<nano::mutex> & lock)
lock.unlock ();
node.election_workers.push_task ([this_l = shared_from_this (), status_l, confirmation_action_l = confirmation_action] () {
node.election_workers.post ([this_l = shared_from_this (), status_l, confirmation_action_l = confirmation_action] () {
// This is necessary if the winner of the election is one of the forks.
// In that case the winning block is not yet in the ledger and cementing needs to wait for rollbacks to complete.
this_l->node.process_confirmed (status_l.winner->hash (), this_l);

View file

@ -161,7 +161,7 @@ void nano::epoch_upgrader::upgrade_impl (nano::raw_key const & prv_a, nano::epoc
upgrader_condition.wait (lock);
}
}
node.workers.push_task ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account] () {
node.workers.post ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account] () {
upgrader_process (upgraded_accounts, epoch, difficulty, signer, root, account);
{
nano::lock_guard<nano::mutex> lock{ upgrader_mutex };
@ -241,7 +241,7 @@ void nano::epoch_upgrader::upgrade_impl (nano::raw_key const & prv_a, nano::epoc
upgrader_condition.wait (lock);
}
}
node.workers.push_task ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account] () {
node.workers.post ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account] () {
upgrader_process (upgraded_pending, epoch, difficulty, signer, root, account);
{
nano::lock_guard<nano::mutex> lock{ upgrader_mutex };

View file

@ -9,6 +9,7 @@ namespace nano
{
class block;
class container_info;
class thread_pool;
}
namespace nano

View file

@ -541,7 +541,7 @@ void nano::json_handler::account_block_count ()
void nano::json_handler::account_create ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -731,7 +731,7 @@ void nano::json_handler::account_list ()
void nano::json_handler::account_move ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -770,7 +770,7 @@ void nano::json_handler::account_move ()
void nano::json_handler::account_remove ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
auto account (rpc_l->account_impl ());
if (!rpc_l->ec)
@ -805,7 +805,7 @@ void nano::json_handler::account_representative ()
void nano::json_handler::account_representative_set ()
{
node.workers.push_task (create_worker_task ([work_generation_enabled = node.work_generation_enabled ()] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([work_generation_enabled = node.work_generation_enabled ()] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
auto account (rpc_l->account_impl ());
std::string representative_text (rpc_l->request.get<std::string> ("representative"));
@ -948,7 +948,7 @@ void nano::json_handler::accounts_representatives ()
void nano::json_handler::accounts_create ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
auto count (rpc_l->count_impl ());
if (!rpc_l->ec)
@ -2930,7 +2930,7 @@ void nano::json_handler::node_id_delete ()
void nano::json_handler::password_change ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -2953,7 +2953,7 @@ void nano::json_handler::password_change ()
void nano::json_handler::password_enter ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -3178,7 +3178,7 @@ void nano::json_handler::receivable_exists ()
void nano::json_handler::process ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
bool const is_async = rpc_l->request.get<bool> ("async", false);
auto block (rpc_l->block_impl (true));
@ -4143,7 +4143,7 @@ void nano::json_handler::unchecked ()
void nano::json_handler::unchecked_clear ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
rpc_l->node.unchecked.clear ();
rpc_l->response_l.put ("success", "");
rpc_l->response_errors ();
@ -4316,7 +4316,7 @@ void nano::json_handler::validate_account_number ()
void nano::json_handler::wallet_add ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -4346,7 +4346,7 @@ void nano::json_handler::wallet_add ()
void nano::json_handler::wallet_add_watch ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -4469,7 +4469,7 @@ void nano::json_handler::wallet_balances ()
void nano::json_handler::wallet_change_seed ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -4517,7 +4517,7 @@ void nano::json_handler::wallet_contains ()
void nano::json_handler::wallet_create ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
nano::raw_key seed;
auto seed_text (rpc_l->request.get_optional<std::string> ("seed"));
if (seed_text.is_initialized () && seed.decode_hex (seed_text.get ()))
@ -4553,7 +4553,7 @@ void nano::json_handler::wallet_create ()
void nano::json_handler::wallet_destroy ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
std::string wallet_text (rpc_l->request.get<std::string> ("wallet"));
nano::wallet_id wallet;
if (!wallet.decode_hex (wallet_text))
@ -4845,7 +4845,7 @@ void nano::json_handler::wallet_representative ()
void nano::json_handler::wallet_representative_set ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
std::string representative_text (rpc_l->request.get<std::string> ("representative"));
auto representative (rpc_l->account_impl (representative_text, nano::error_rpc::bad_representative_number));
@ -5132,7 +5132,7 @@ void nano::json_handler::work_get ()
void nano::json_handler::work_set ()
{
node.workers.push_task (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.post (create_worker_task ([] (std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
auto account (rpc_l->account_impl ());
auto work (rpc_l->work_optional_impl ());

View file

@ -309,7 +309,7 @@ void nano::network::flood_block_many (std::deque<std::shared_ptr<nano::block>> b
if (!blocks_a.empty ())
{
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a] () {
node.workers.post_delayed (std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a] () {
if (auto node_l = node_w.lock ())
{
node_l->network.flood_block_many (std::move (blocks), callback_a, delay_a);

View file

@ -1,5 +1,6 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/stream.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/thread_runner.hpp>
#include <nano/lib/tomlconfig.hpp>
#include <nano/lib/utility.hpp>
@ -69,6 +70,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, uint16_t pe
nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesystem::path const & application_path_a, nano::node_config const & config_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
node_id{ load_or_create_node_id (application_path_a) },
config{ config_a },
flags{ flags_a },
io_ctx_shared{ std::make_shared<boost::asio::io_context> () },
io_ctx{ *io_ctx_shared },
logger{ make_logger_identifier (node_id) },
@ -77,11 +79,14 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
node_initialized_latch (1),
network_params{ config.network_params },
stats{ logger, config.stats_config },
workers{ config.background_threads, nano::thread_role::name::worker },
bootstrap_workers{ config.bootstrap_serving_threads, nano::thread_role::name::bootstrap_worker },
wallet_workers{ 1, nano::thread_role::name::wallet_worker },
election_workers{ 1, nano::thread_role::name::election_worker },
flags (flags_a),
workers_impl{ std::make_unique<nano::thread_pool> (config.background_threads, nano::thread_role::name::worker, /* start immediately */ true) },
workers{ *workers_impl },
bootstrap_workers_impl{ std::make_unique<nano::thread_pool> (config.bootstrap_serving_threads, nano::thread_role::name::bootstrap_worker, /* start immediately */ true) },
bootstrap_workers{ *bootstrap_workers_impl },
wallet_workers_impl{ std::make_unique<nano::thread_pool> (1, nano::thread_role::name::wallet_worker, /* start immediately */ true) },
wallet_workers{ *wallet_workers_impl },
election_workers_impl{ std::make_unique<nano::thread_pool> (1, nano::thread_role::name::election_worker, /* start immediately */ true) },
election_workers{ *election_workers_impl },
work (work_a),
distributed_work (*this),
store_impl (nano::make_store (logger, application_path_a, network_params.ledger, flags.read_only, true, config_a.rocksdb_config, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_config, config_a.backup_before_upgrade)),
@ -402,7 +407,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
// TODO: Is it neccessary to call this for all blocks?
if (block->is_send ())
{
wallet_workers.push_task ([this, hash = block->hash (), destination = block->destination ()] () {
wallet_workers.post ([this, hash = block->hash (), destination = block->destination ()] () {
wallets.receive_confirmed (hash, destination);
});
}
@ -564,7 +569,7 @@ void nano::node::start ()
if (flags.enable_pruning)
{
auto this_l (shared ());
workers.push_task ([this_l] () {
workers.post ([this_l] () {
this_l->ongoing_ledger_pruning ();
});
}
@ -605,7 +610,7 @@ void nano::node::start ()
{
// Delay to start wallet lazy bootstrap
auto this_l (shared ());
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::minutes (1), [this_l] () {
workers.post_delayed (std::chrono::minutes (1), [this_l] () {
this_l->bootstrap_wallet ();
});
}
@ -654,9 +659,7 @@ void nano::node::stop ()
logger.info (nano::log::type::node, "Node stopping...");
tcp_listener.stop ();
bootstrap_workers.stop ();
wallet_workers.stop ();
election_workers.stop ();
vote_router.stop ();
peer_history.stop ();
// Cancels ongoing work generation tasks, which may be blocking other threads
@ -684,12 +687,16 @@ void nano::node::stop ()
wallets.stop ();
stats.stop ();
epoch_upgrader.stop ();
workers.stop ();
local_block_broadcaster.stop ();
message_processor.stop ();
network.stop (); // Stop network last to avoid killing in-use sockets
monitor.stop ();
bootstrap_workers.stop ();
wallet_workers.stop ();
election_workers.stop ();
workers.stop ();
// work pool is not stopped on purpose due to testing setup
// Stop the IO runner last
@ -823,7 +830,7 @@ void nano::node::ongoing_bootstrap ()
// Bootstrap and schedule for next attempt
bootstrap_initiator.bootstrap (false, boost::str (boost::format ("auto_bootstrap_%1%") % previous_bootstrap_count), frontiers_age);
std::weak_ptr<nano::node> node_w (shared_from_this ());
workers.add_timed_task (std::chrono::steady_clock::now () + next_wakeup, [node_w] () {
workers.post_delayed (next_wakeup, [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_bootstrap ();
@ -844,7 +851,7 @@ void nano::node::backup_wallet ()
i->second->store.write_backup (transaction, backup_path / (i->first.to_string () + ".json"));
}
auto this_l (shared ());
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.backup_interval, [this_l] () {
workers.post_delayed (network_params.node.backup_interval, [this_l] () {
this_l->backup_wallet ();
});
}
@ -856,7 +863,7 @@ void nano::node::search_receivable_all ()
// Search pending
wallets.search_receivable_all ();
auto this_l (shared ());
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.search_pending_interval, [this_l] () {
workers.post_delayed (network_params.node.search_pending_interval, [this_l] () {
this_l->search_receivable_all ();
});
}
@ -981,8 +988,8 @@ void nano::node::ongoing_ledger_pruning ()
ledger_pruning (flags.block_processor_batch_size != 0 ? flags.block_processor_batch_size : 2 * 1024, bootstrap_weight_reached);
auto const ledger_pruning_interval (bootstrap_weight_reached ? config.max_pruning_age : std::min (config.max_pruning_age, std::chrono::seconds (15 * 60)));
auto this_l (shared ());
workers.add_timed_task (std::chrono::steady_clock::now () + ledger_pruning_interval, [this_l] () {
this_l->workers.push_task ([this_l] () {
workers.post_delayed (ledger_pruning_interval, [this_l] () {
this_l->workers.post ([this_l] () {
this_l->ongoing_ledger_pruning ();
});
});
@ -1126,7 +1133,7 @@ bool nano::node::block_confirmed_or_being_confirmed (nano::block_hash const & ha
void nano::node::ongoing_online_weight_calculation_queue ()
{
std::weak_ptr<nano::node> node_w (shared_from_this ());
workers.add_timed_task (std::chrono::steady_clock::now () + (std::chrono::seconds (network_params.node.weight_period)), [node_w] () {
workers.post_delayed ((std::chrono::seconds (network_params.node.weight_period)), [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_online_weight_calculation ();
@ -1165,7 +1172,7 @@ void nano::node::process_confirmed (nano::block_hash hash, std::shared_ptr<nano:
stats.inc (nano::stat::type::process_confirmed, nano::stat::detail::retry);
// Try again later
election_workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.process_confirmed_interval, [this, hash, election, iteration] () {
election_workers.post_delayed (network_params.node.process_confirmed_interval, [this, hash, election, iteration] () {
process_confirmed (hash, election, iteration + 1);
});
}

View file

@ -4,7 +4,6 @@
#include <nano/lib/config.hpp>
#include <nano/lib/logging.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/thread_pool.hpp>
#include <nano/lib/work.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
@ -148,6 +147,7 @@ public:
public:
const nano::keypair node_id;
nano::node_config config;
nano::node_flags flags;
std::shared_ptr<boost::asio::io_context> io_ctx_shared;
boost::asio::io_context & io_ctx;
nano::logger logger;
@ -156,11 +156,14 @@ public:
boost::latch node_initialized_latch;
nano::network_params & network_params;
nano::stats stats;
nano::thread_pool workers;
nano::thread_pool bootstrap_workers;
nano::thread_pool wallet_workers;
nano::thread_pool election_workers;
nano::node_flags flags;
std::unique_ptr<nano::thread_pool> workers_impl;
nano::thread_pool & workers;
std::unique_ptr<nano::thread_pool> bootstrap_workers_impl;
nano::thread_pool & bootstrap_workers;
std::unique_ptr<nano::thread_pool> wallet_workers_impl;
nano::thread_pool & wallet_workers;
std::unique_ptr<nano::thread_pool> election_workers_impl;
nano::thread_pool & election_workers;
nano::work_pool & work;
nano::distributed_work_factory distributed_work;
std::unique_ptr<nano::store::component> store_impl;

View file

@ -526,7 +526,7 @@ void nano::transport::tcp_server::bootstrap_message_visitor::bulk_pull (const na
return;
}
node->bootstrap_workers.push_task ([server = server, message = message] () {
node->bootstrap_workers.post ([server = server, message = message] () {
// TODO: Add completion callback to bulk pull server
// TODO: There should be no need to re-copy message as unique pointer, refactor those bulk/frontier pull/push servers
auto bulk_pull_server = std::make_shared<nano::bulk_pull_server> (server, std::make_unique<nano::bulk_pull> (message));
@ -548,7 +548,7 @@ void nano::transport::tcp_server::bootstrap_message_visitor::bulk_pull_account (
return;
}
node->bootstrap_workers.push_task ([server = server, message = message] () {
node->bootstrap_workers.post ([server = server, message = message] () {
// TODO: Add completion callback to bulk pull server
// TODO: There should be no need to re-copy message as unique pointer, refactor those bulk/frontier pull/push servers
auto bulk_pull_account_server = std::make_shared<nano::bulk_pull_account_server> (server, std::make_unique<nano::bulk_pull_account> (message));
@ -565,7 +565,7 @@ void nano::transport::tcp_server::bootstrap_message_visitor::bulk_push (const na
{
return;
}
node->bootstrap_workers.push_task ([server = server] () {
node->bootstrap_workers.post ([server = server] () {
// TODO: Add completion callback to bulk pull server
auto bulk_push_server = std::make_shared<nano::bulk_push_server> (server);
bulk_push_server->throttled_receive ();
@ -582,7 +582,7 @@ void nano::transport::tcp_server::bootstrap_message_visitor::frontier_req (const
return;
}
node->bootstrap_workers.push_task ([server = server, message = message] () {
node->bootstrap_workers.post ([server = server, message = message] () {
// TODO: There should be no need to re-copy message as unique pointer, refactor those bulk/frontier pull/push servers
auto response = std::make_shared<nano::frontier_req_server> (server, std::make_unique<nano::frontier_req> (message));
response->send_next ();

View file

@ -274,7 +274,7 @@ void nano::transport::tcp_socket::ongoing_checkup ()
return;
}
node_l->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 5), [this_w = weak_from_this ()] () {
node_l->workers.post_delayed (std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 5), [this_w = weak_from_this ()] () {
auto this_l = this_w.lock ();
if (!this_l)
{

View file

@ -1162,7 +1162,7 @@ void nano::wallet::work_ensure (nano::account const & account_a, nano::root cons
wallets.delayed_work->operator[] (account_a) = root_a;
wallets.node.workers.add_timed_task (std::chrono::steady_clock::now () + precache_delay, [this_l = shared_from_this (), account_a, root_a] {
wallets.node.workers.post_delayed (precache_delay, [this_l = shared_from_this (), account_a, root_a] {
auto delayed_work = this_l->wallets.delayed_work.lock ();
auto existing (delayed_work->find (account_a));
if (existing != delayed_work->end () && existing->second == root_a)
@ -1713,7 +1713,7 @@ void nano::wallets::ongoing_compute_reps ()
auto & node_l (node);
// Representation drifts quickly on the test network but very slowly on the live network
auto compute_delay = network_params.network.is_dev_network () ? std::chrono::milliseconds (10) : (network_params.network.is_test_network () ? std::chrono::milliseconds (nano::test_scan_wallet_reps_delay ()) : std::chrono::minutes (15));
node.workers.add_timed_task (std::chrono::steady_clock::now () + compute_delay, [&node_l] () {
node.workers.post_delayed (compute_delay, [&node_l] () {
node_l.wallets.ongoing_compute_reps ();
});
}

View file

@ -107,7 +107,7 @@ nano_qt::self_pane::self_pane (nano_qt::wallet & wallet_a, nano::account const &
QObject::connect (copy_button, &QPushButton::clicked, [this] () {
this->wallet.application.clipboard ()->setText (QString (this->wallet.account.to_account ().c_str ()));
copy_button->setText ("Copied!");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (2), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (2), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
copy_button->setText ("Copy");
}));
@ -201,7 +201,7 @@ nano_qt::accounts::accounts (nano_qt::wallet & wallet_a) :
this->wallet.wallet_m->deterministic_insert (transaction);
show_button_success (*create_account);
create_account->setText ("New account was created");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*create_account);
create_account->setText ("Create account");
@ -212,7 +212,7 @@ nano_qt::accounts::accounts (nano_qt::wallet & wallet_a) :
{
show_button_error (*create_account);
create_account->setText ("Wallet is locked, unlock it to create account");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*create_account);
create_account->setText ("Create account");
@ -234,7 +234,7 @@ nano_qt::accounts::accounts (nano_qt::wallet & wallet_a) :
this->wallet.application.clipboard ()->setText (QString (seed.to_string ().c_str ()));
show_button_success (*backup_seed);
backup_seed->setText ("Seed was copied to clipboard");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*backup_seed);
backup_seed->setText ("Copy wallet seed to clipboard");
@ -246,7 +246,7 @@ nano_qt::accounts::accounts (nano_qt::wallet & wallet_a) :
this->wallet.application.clipboard ()->setText ("");
show_button_error (*backup_seed);
backup_seed->setText ("Wallet is locked, unlock it to enable the backup");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*backup_seed);
backup_seed->setText ("Copy wallet seed to clipboard");
@ -280,7 +280,7 @@ void nano_qt::accounts::refresh_wallet_balance ()
final_text += "\nReady to receive: " + wallet.format_balance (pending);
}
wallet_balance_label->setText (QString (final_text.c_str ()));
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (60), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (60), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
refresh_wallet_balance ();
}));
@ -410,7 +410,7 @@ nano_qt::import::import (nano_qt::wallet & wallet_a) :
show_line_error (*seed);
show_button_error (*import_seed);
import_seed->setText ("Wallet is locked, unlock it to enable the import");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (10), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (10), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_line_ok (*seed);
show_button_ok (*import_seed);
@ -427,7 +427,7 @@ nano_qt::import::import (nano_qt::wallet & wallet_a) :
show_button_success (*import_seed);
import_seed->setText ("Successful import of seed");
this->wallet.refresh ();
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*import_seed);
import_seed->setText ("Import seed");
@ -447,7 +447,7 @@ nano_qt::import::import (nano_qt::wallet & wallet_a) :
{
import_seed->setText ("Incorrect seed. Only HEX characters allowed");
}
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*import_seed);
import_seed->setText ("Import seed");
@ -460,7 +460,7 @@ nano_qt::import::import (nano_qt::wallet & wallet_a) :
show_line_error (*clear_line);
show_button_error (*import_seed);
import_seed->setText ("Type words 'clear keys'");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*import_seed);
import_seed->setText ("Import seed");
@ -745,7 +745,7 @@ void nano_qt::block_viewer::rebroadcast_action (nano::block_hash const & hash_a)
if (successor)
{
done = false;
wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this, successor] () {
wallet.node.workers.post_delayed (std::chrono::seconds (1), [this, successor] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this, successor] () {
rebroadcast_action (successor.value ());
}));
@ -1147,7 +1147,7 @@ void nano_qt::wallet::ongoing_refresh ()
}
}));
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [wallet_w] () {
node.workers.post_delayed (std::chrono::seconds (5), [wallet_w] () {
if (auto wallet_l = wallet_w.lock ())
{
wallet_l->ongoing_refresh ();
@ -1231,7 +1231,7 @@ void nano_qt::wallet::start ()
{
show_button_error (*this_l->send_blocks_send);
this_l->send_blocks_send->setText ("Wallet is locked, unlock it to send");
this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w] () {
this_l->node.workers.post_delayed (std::chrono::seconds (5), [this_w] () {
if (auto this_l = this_w.lock ())
{
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w] () {
@ -1250,7 +1250,7 @@ void nano_qt::wallet::start ()
show_line_error (*this_l->send_count);
show_button_error (*this_l->send_blocks_send);
this_l->send_blocks_send->setText ("Not enough balance");
this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w] () {
this_l->node.workers.post_delayed (std::chrono::seconds (5), [this_w] () {
if (auto this_l = this_w.lock ())
{
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w] () {
@ -1269,7 +1269,7 @@ void nano_qt::wallet::start ()
show_line_error (*this_l->send_account);
show_button_error (*this_l->send_blocks_send);
this_l->send_blocks_send->setText ("Bad destination account");
this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w] () {
this_l->node.workers.post_delayed (std::chrono::seconds (5), [this_w] () {
if (auto this_l = this_w.lock ())
{
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w] () {
@ -1288,7 +1288,7 @@ void nano_qt::wallet::start ()
show_line_error (*this_l->send_count);
show_button_error (*this_l->send_blocks_send);
this_l->send_blocks_send->setText ("Bad amount number");
this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w] () {
this_l->node.workers.post_delayed (std::chrono::seconds (5), [this_w] () {
if (auto this_l = this_w.lock ())
{
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w] () {
@ -1464,7 +1464,7 @@ void nano_qt::wallet::update_connected ()
void nano_qt::wallet::empty_password ()
{
this->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [this] () {
this->node.workers.post_delayed (std::chrono::seconds (3), [this] () {
auto transaction (wallet_m->wallets.tx_begin_write ());
wallet_m->enter_password (transaction, std::string (""));
});
@ -1568,7 +1568,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) :
change->setText ("Password was changed");
this->wallet.node.logger.warn (nano::log::type::qt, "Wallet password changed");
update_locked (false, false);
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*change);
change->setText ("Set/Change password");
@ -1586,7 +1586,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) :
{
show_button_error (*change);
change->setText ("Wallet is locked, unlock it");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*change);
change->setText ("Set/Change password");
@ -1612,7 +1612,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) :
change_rep->setText ("Representative was changed");
current_representative->setText (QString (representative_l.to_account ().c_str ()));
new_representative->clear ();
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*change_rep);
change_rep->setText ("Change representative");
@ -1623,7 +1623,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) :
{
show_button_error (*change_rep);
change_rep->setText ("Wallet is locked, unlock it");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_button_ok (*change_rep);
change_rep->setText ("Change representative");
@ -1636,7 +1636,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) :
show_line_error (*new_representative);
show_button_error (*change_rep);
change_rep->setText ("Invalid account");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_line_ok (*new_representative);
show_button_ok (*change_rep);
@ -1676,7 +1676,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) :
show_line_error (*password);
show_button_error (*lock_toggle);
lock_toggle->setText ("Invalid password");
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () {
this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () {
show_line_ok (*password);
show_button_ok (*lock_toggle);

View file

@ -68,7 +68,7 @@ TEST (rpc, wrapped_task)
// Exception should get caught
throw std::runtime_error ("");
}));
system.nodes[0]->workers.push_task (task);
system.nodes[0]->workers.post (task);
ASSERT_TIMELY_EQ (5s, response, true);
}

View file

@ -104,7 +104,7 @@ TEST (system, receive_while_synchronizing)
node1->start ();
system.nodes.push_back (node1);
ASSERT_NE (nullptr, nano::test::establish_tcp (system, *node1, node->network.endpoint ()));
node1->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (200), ([&system, &key] () {
node1->workers.post_delayed (std::chrono::milliseconds (200), ([&system, &key] () {
auto hash (system.wallet (0)->send_sync (nano::dev::genesis_key.pub, key.pub, system.nodes[0]->config.receive_minimum.number ()));
auto transaction = system.nodes[0]->ledger.tx_begin_read ();
auto block = system.nodes[0]->ledger.any.block_get (transaction, hash);

View file

@ -406,7 +406,7 @@ public:
if (count_l > 0)
{
auto this_l (shared_from_this ());
node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (wait), [this_l] () { this_l->run (); });
node->workers.post_delayed (std::chrono::milliseconds (wait), [this_l] () { this_l->run (); });
}
}
std::vector<nano::account> accounts;