diff --git a/nano/core_test/CMakeLists.txt b/nano/core_test/CMakeLists.txt index 1d01f6e7..2db941e6 100644 --- a/nano/core_test/CMakeLists.txt +++ b/nano/core_test/CMakeLists.txt @@ -53,6 +53,7 @@ add_executable( socket.cpp system.cpp telemetry.cpp + thread_pool.cpp throttle.cpp toml.cpp timer.cpp diff --git a/nano/core_test/thread_pool.cpp b/nano/core_test/thread_pool.cpp new file mode 100644 index 00000000..810bbd0a --- /dev/null +++ b/nano/core_test/thread_pool.cpp @@ -0,0 +1,96 @@ +#include +#include +#include + +#include + +#include + +TEST (thread_pool, thread_pool) +{ + std::atomic passed_sleep{ false }; + + auto func = [&passed_sleep] () { + std::this_thread::sleep_for (std::chrono::seconds (1)); + passed_sleep = true; + }; + + nano::thread_pool workers (1u, nano::thread_role::name::unknown); + nano::test::start_stop_guard stop_guard{ workers }; + workers.post (func); + ASSERT_FALSE (passed_sleep); + + nano::timer timer_l; + timer_l.start (); + while (!passed_sleep) + { + if (timer_l.since_start () > std::chrono::seconds (10)) + { + break; + } + } + ASSERT_TRUE (passed_sleep); +} + +TEST (thread_pool, one) +{ + std::atomic done (false); + nano::mutex mutex; + nano::condition_variable condition; + nano::thread_pool workers (1u, nano::thread_role::name::unknown); + nano::test::start_stop_guard stop_guard{ workers }; + workers.post ([&] () { + { + nano::lock_guard lock{ mutex }; + done = true; + } + condition.notify_one (); + }); + nano::unique_lock unique{ mutex }; + condition.wait (unique, [&] () { return !!done; }); +} + +TEST (thread_pool, many) +{ + std::atomic count (0); + nano::mutex mutex; + nano::condition_variable condition; + nano::thread_pool workers (50u, nano::thread_role::name::unknown); + nano::test::start_stop_guard stop_guard{ workers }; + for (auto i (0); i < 50; ++i) + { + workers.post ([&] () { + { + nano::lock_guard lock{ mutex }; + count += 1; + } + condition.notify_one (); + }); + } + nano::unique_lock unique{ mutex }; + condition.wait (unique, [&] () { return count == 50; }); +} + +TEST (thread_pool, top_execution) +{ + int value1 (0); + int value2 (0); + nano::mutex mutex; + std::promise promise; + nano::thread_pool workers (1u, nano::thread_role::name::unknown); + nano::test::start_stop_guard stop_guard{ workers }; + workers.post ([&] () { + nano::lock_guard lock{ mutex }; + value1 = 1; + value2 = 1; + }); + workers.post_delayed (std::chrono::milliseconds (1), [&] () { + nano::lock_guard lock{ mutex }; + value2 = 2; + promise.set_value (false); + }); + promise.get_future ().get (); + nano::lock_guard lock{ mutex }; + ASSERT_EQ (1, value1); + ASSERT_EQ (2, value2); +} \ No newline at end of file diff --git a/nano/core_test/utility.cpp b/nano/core_test/utility.cpp index 730714dc..30ee2954 100644 --- a/nano/core_test/utility.cpp +++ b/nano/core_test/utility.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include @@ -146,91 +146,6 @@ TEST (optional_ptr, basic) ASSERT_EQ (opt->z, 3); } -TEST (thread, thread_pool) -{ - std::atomic passed_sleep{ false }; - - auto func = [&passed_sleep] () { - std::this_thread::sleep_for (std::chrono::seconds (1)); - passed_sleep = true; - }; - - nano::thread_pool workers (1u, nano::thread_role::name::unknown); - workers.push_task (func); - ASSERT_FALSE (passed_sleep); - - nano::timer timer_l; - timer_l.start (); - while (!passed_sleep) - { - if (timer_l.since_start () > std::chrono::seconds (10)) - { - break; - } - } - ASSERT_TRUE (passed_sleep); -} - -TEST (thread_pool_alarm, one) -{ - std::atomic done (false); - nano::mutex mutex; - nano::condition_variable condition; - nano::thread_pool workers (1u, nano::thread_role::name::unknown); - workers.add_timed_task (std::chrono::steady_clock::now (), [&] () { - { - nano::lock_guard lock{ mutex }; - done = true; - } - condition.notify_one (); - }); - nano::unique_lock unique{ mutex }; - condition.wait (unique, [&] () { return !!done; }); -} - -TEST (thread_pool_alarm, many) -{ - std::atomic count (0); - nano::mutex mutex; - nano::condition_variable condition; - nano::thread_pool workers (50u, nano::thread_role::name::unknown); - for (auto i (0); i < 50; ++i) - { - workers.add_timed_task (std::chrono::steady_clock::now (), [&] () { - { - nano::lock_guard lock{ mutex }; - count += 1; - } - condition.notify_one (); - }); - } - nano::unique_lock unique{ mutex }; - condition.wait (unique, [&] () { return count == 50; }); -} - -TEST (thread_pool_alarm, top_execution) -{ - int value1 (0); - int value2 (0); - nano::mutex mutex; - std::promise promise; - nano::thread_pool workers (1u, nano::thread_role::name::unknown); - workers.add_timed_task (std::chrono::steady_clock::now (), [&] () { - nano::lock_guard lock{ mutex }; - value1 = 1; - value2 = 1; - }); - workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [&] () { - nano::lock_guard lock{ mutex }; - value2 = 2; - promise.set_value (false); - }); - promise.get_future ().get (); - nano::lock_guard lock{ mutex }; - ASSERT_EQ (1, value1); - ASSERT_EQ (2, value2); -} - TEST (filesystem, remove_all_files) { auto path = nano::unique_path (); diff --git a/nano/lib/CMakeLists.txt b/nano/lib/CMakeLists.txt index 702cb84f..76d53944 100644 --- a/nano/lib/CMakeLists.txt +++ b/nano/lib/CMakeLists.txt @@ -94,7 +94,6 @@ add_library( stats_sinks.hpp stream.hpp thread_pool.hpp - thread_pool.cpp thread_roles.hpp thread_roles.cpp thread_runner.hpp diff --git a/nano/lib/thread_pool.cpp b/nano/lib/thread_pool.cpp deleted file mode 100644 index c5f0108e..00000000 --- a/nano/lib/thread_pool.cpp +++ /dev/null @@ -1,97 +0,0 @@ -#include - -#include -#include -#include - -/* - * thread_pool - */ - -nano::thread_pool::thread_pool (unsigned num_threads, nano::thread_role::name thread_name) : - num_threads (num_threads), - thread_pool_m (std::make_unique (num_threads)), - thread_names_latch{ num_threads } -{ - set_thread_names (thread_name); -} - -nano::thread_pool::~thread_pool () -{ - stop (); -} - -void nano::thread_pool::stop () -{ - nano::unique_lock lk (mutex); - if (!stopped) - { - stopped = true; -#if defined(BOOST_ASIO_HAS_IOCP) - // A hack needed for Windows to prevent deadlock during destruction, described here: https://github.com/chriskohlhoff/asio/issues/431 - boost::asio::use_service (*thread_pool_m).stop (); -#endif - lk.unlock (); - thread_pool_m->stop (); - thread_pool_m->join (); - lk.lock (); - thread_pool_m = nullptr; - } -} - -void nano::thread_pool::push_task (std::function task) -{ - ++num_tasks; - nano::lock_guard guard (mutex); - if (!stopped) - { - boost::asio::post (*thread_pool_m, [this, task] () { - task (); - --num_tasks; - }); - } -} - -void nano::thread_pool::add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function task) -{ - nano::lock_guard guard (mutex); - if (!stopped && thread_pool_m) - { - auto timer = std::make_shared (thread_pool_m->get_executor (), expiry_time); - timer->async_wait ([this, task, timer] (boost::system::error_code const & ec) { - if (!ec) - { - push_task (task); - } - }); - } -} - -unsigned nano::thread_pool::get_num_threads () const -{ - return num_threads; -} - -uint64_t nano::thread_pool::num_queued_tasks () const -{ - return num_tasks; -} - -void nano::thread_pool::set_thread_names (nano::thread_role::name thread_name) -{ - for (auto i = 0u; i < num_threads; ++i) - { - boost::asio::post (*thread_pool_m, [this, thread_name] () { - nano::thread_role::set (thread_name); - thread_names_latch.arrive_and_wait (); - }); - } - thread_names_latch.wait (); -} - -nano::container_info nano::thread_pool::container_info () const -{ - nano::container_info info; - info.put ("count", num_queued_tasks ()); - return info; -} diff --git a/nano/lib/thread_pool.hpp b/nano/lib/thread_pool.hpp index 2b32e60f..157272b7 100644 --- a/nano/lib/thread_pool.hpp +++ b/nano/lib/thread_pool.hpp @@ -4,50 +4,151 @@ #include #include +#include +#include +#include + #include #include -#include #include - -namespace boost::asio -{ -class thread_pool; -} +#include +#include namespace nano { class thread_pool final { public: - explicit thread_pool (unsigned num_threads, nano::thread_role::name); - ~thread_pool (); + // TODO: Auto start should be removed once the node is refactored to start the thread pool explicitly + thread_pool (unsigned num_threads, nano::thread_role::name thread_name, bool auto_start = false) : + num_threads{ num_threads }, + thread_name{ thread_name }, + thread_names_latch{ num_threads } + { + if (auto_start) + { + start (); + } + } - /** This will run when there is an available thread for execution */ - void push_task (std::function); + ~thread_pool () + { + // Must be stopped before destruction to avoid running takss when node components are being destroyed + debug_assert (!thread_pool_impl); + } - /** Run a task at a certain point in time */ - void add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function task); + void start () + { + debug_assert (!stopped); + debug_assert (!thread_pool_impl); + thread_pool_impl = std::make_unique (num_threads); + set_thread_names (); + } - /** Stops any further pushed tasks from executing */ - void stop (); + void stop () + { + nano::unique_lock lock{ mutex }; + if (!stopped && thread_pool_impl) + { + stopped = true; - /** Number of threads in the thread pool */ - unsigned get_num_threads () const; + // TODO: Is this still needed? +#if defined(BOOST_ASIO_HAS_IOCP) + // A hack needed for Windows to prevent deadlock during destruction, described here: https://github.com/chriskohlhoff/asio/issues/431 + boost::asio::use_service (*thread_pool_impl).stop (); +#endif - /** Returns the number of tasks which are awaiting execution by the thread pool **/ - uint64_t num_queued_tasks () const; + lock.unlock (); - nano::container_info container_info () const; + thread_pool_impl->stop (); + thread_pool_impl->join (); + + lock.lock (); + thread_pool_impl = nullptr; + } + } + + template + void post (F && task) + { + nano::lock_guard guard{ mutex }; + if (!stopped) + { + ++num_tasks; + release_assert (thread_pool_impl); + boost::asio::post (*thread_pool_impl, [this, t = std::forward (task)] () mutable { + t (); + --num_tasks; + }); + } + } + + template + void post_delayed (std::chrono::steady_clock::duration const & delay, F && task) + { + nano::lock_guard guard{ mutex }; + if (!stopped) + { + ++num_delayed; + release_assert (thread_pool_impl); + auto timer = std::make_shared (thread_pool_impl->get_executor ()); + timer->expires_after (delay); + timer->async_wait ([this, t = std::forward (task), /* preserve lifetime */ timer] (boost::system::error_code const & ec) mutable { + if (!ec) + { + --num_delayed; + post (std::move (t)); + } + }); + } + } + + bool alive () const + { + nano::lock_guard guard{ mutex }; + return thread_pool_impl != nullptr; + } + + uint64_t queued_tasks () const + { + return num_tasks; + } + + uint64_t delayed_tasks () const + { + return num_delayed; + } + + nano::container_info container_info () const + { + nano::container_info info; + info.put ("tasks", num_tasks); + info.put ("delayed", num_delayed); + return info; + } private: - nano::mutex mutex; - std::atomic stopped{ false }; - unsigned num_threads; - std::unique_ptr thread_pool_m; - nano::relaxed_atomic_integral num_tasks{ 0 }; + void set_thread_names () + { + for (auto i = 0u; i < num_threads; ++i) + { + boost::asio::post (*thread_pool_impl, [this] () { + nano::thread_role::set (thread_name); + thread_names_latch.arrive_and_wait (); + }); + } + thread_names_latch.wait (); + } + +private: + unsigned const num_threads; + nano::thread_role::name const thread_name; - /** Set the names of all the threads in the thread pool for easier identification */ std::latch thread_names_latch; - void set_thread_names (nano::thread_role::name thread_name); + mutable nano::mutex mutex; + std::atomic stopped{ false }; + std::unique_ptr thread_pool_impl; + std::atomic num_tasks{ 0 }; + std::atomic num_delayed{ 0 }; }; -} // namespace nano +} \ No newline at end of file diff --git a/nano/node/bootstrap/bootstrap_bulk_pull.cpp b/nano/node/bootstrap/bootstrap_bulk_pull.cpp index 892e2869..f17b08f9 100644 --- a/nano/node/bootstrap/bootstrap_bulk_pull.cpp +++ b/nano/node/bootstrap/bootstrap_bulk_pull.cpp @@ -127,7 +127,7 @@ void nano::bulk_pull_client::throttled_receive_block () else { auto this_l (shared_from_this ()); - node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l] () { + node->workers.post_delayed (std::chrono::seconds (1), [this_l] () { if (!this_l->connection->pending_stop && !this_l->attempt->stopped) { this_l->throttled_receive_block (); @@ -530,7 +530,7 @@ void nano::bulk_pull_server::sent_action (boost::system::error_code const & ec, } if (!ec) { - node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () { + node->bootstrap_workers.post ([this_l = shared_from_this ()] () { this_l->send_next (); }); } @@ -816,7 +816,7 @@ void nano::bulk_pull_account_server::sent_action (boost::system::error_code cons } if (!ec) { - node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () { + node->bootstrap_workers.post ([this_l = shared_from_this ()] () { this_l->send_next_block (); }); } diff --git a/nano/node/bootstrap/bootstrap_bulk_push.cpp b/nano/node/bootstrap/bootstrap_bulk_push.cpp index 47508878..c023ec1f 100644 --- a/nano/node/bootstrap/bootstrap_bulk_push.cpp +++ b/nano/node/bootstrap/bootstrap_bulk_push.cpp @@ -144,7 +144,7 @@ void nano::bulk_push_server::throttled_receive () else { auto this_l (shared_from_this ()); - node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l] () { + node->workers.post_delayed (std::chrono::seconds (1), [this_l] () { if (!this_l->connection->stopped) { this_l->throttled_receive (); diff --git a/nano/node/bootstrap/bootstrap_connections.cpp b/nano/node/bootstrap/bootstrap_connections.cpp index 358538a6..b3f1334b 100644 --- a/nano/node/bootstrap/bootstrap_connections.cpp +++ b/nano/node/bootstrap/bootstrap_connections.cpp @@ -306,7 +306,7 @@ void nano::bootstrap_connections::populate_connections (bool repeat) if (!stopped && repeat) { std::weak_ptr this_w (shared_from_this ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_w] () { + node.workers.post_delayed (std::chrono::seconds (1), [this_w] () { if (auto this_l = this_w.lock ()) { this_l->populate_connections (); diff --git a/nano/node/bootstrap/bootstrap_frontier.cpp b/nano/node/bootstrap/bootstrap_frontier.cpp index d0ad8276..8a6067ef 100644 --- a/nano/node/bootstrap/bootstrap_frontier.cpp +++ b/nano/node/bootstrap/bootstrap_frontier.cpp @@ -70,7 +70,7 @@ void nano::frontier_req_client::receive_frontier () // we simply get a size of 0. if (size_a == nano::frontier_req_client::size_frontier) { - node->bootstrap_workers.push_task ([this_l, ec, size_a] () { + node->bootstrap_workers.post ([this_l, ec, size_a] () { this_l->received_frontier (ec, size_a); }); } @@ -355,7 +355,7 @@ void nano::frontier_req_server::sent_action (boost::system::error_code const & e { count++; - node->bootstrap_workers.push_task ([this_l = shared_from_this ()] () { + node->bootstrap_workers.post ([this_l = shared_from_this ()] () { this_l->send_next (); }); } diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index 4d8d1450..e504b0ac 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -55,6 +55,8 @@ void nano::confirming_set::start () return; } + notification_workers.start (); + thread = std::thread{ [this] () { nano::thread_role::set (nano::thread_role::name::confirmation_height); run (); @@ -148,7 +150,7 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) std::unique_lock lock{ mutex }; // It's possible that ledger cementing happens faster than the notifications can be processed by other components, cooldown here - while (notification_workers.num_queued_tasks () >= config.max_queued_notifications) + while (notification_workers.queued_tasks () >= config.max_queued_notifications) { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::cooldown); condition.wait_for (lock, 100ms, [this] { return stopped.load (); }); @@ -158,7 +160,7 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) } } - notification_workers.push_task ([this, batch = std::move (batch)] () { + notification_workers.post ([this, batch = std::move (batch)] () { stats.inc (nano::stat::type::confirming_set, nano::stat::detail::notify); batch_cemented.notify (batch); }); diff --git a/nano/node/distributed_work.cpp b/nano/node/distributed_work.cpp index 1404c065..81d771a5 100644 --- a/nano/node/distributed_work.cpp +++ b/nano/node/distributed_work.cpp @@ -401,10 +401,9 @@ void nano::distributed_work::handle_failure () status = work_generation_status::failure_peers; - auto now (std::chrono::steady_clock::now ()); std::weak_ptr node_weak (node.shared ()); auto next_backoff (std::min (backoff * 2, std::chrono::seconds (5 * 60))); - node.workers.add_timed_task (now + std::chrono::seconds (backoff), [node_weak, request_l = request, next_backoff] { + node.workers.post_delayed (std::chrono::seconds (backoff), [node_weak, request_l = request, next_backoff] { bool error_l{ true }; if (auto node_l = node_weak.lock ()) { diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 93b9e8f7..06ea4135 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -62,7 +62,7 @@ void nano::election::confirm_once (nano::unique_lock & lock) lock.unlock (); - node.election_workers.push_task ([this_l = shared_from_this (), status_l, confirmation_action_l = confirmation_action] () { + node.election_workers.post ([this_l = shared_from_this (), status_l, confirmation_action_l = confirmation_action] () { // This is necessary if the winner of the election is one of the forks. // In that case the winning block is not yet in the ledger and cementing needs to wait for rollbacks to complete. this_l->node.process_confirmed (status_l.winner->hash (), this_l); diff --git a/nano/node/epoch_upgrader.cpp b/nano/node/epoch_upgrader.cpp index 6667e7e2..69d2a506 100644 --- a/nano/node/epoch_upgrader.cpp +++ b/nano/node/epoch_upgrader.cpp @@ -161,7 +161,7 @@ void nano::epoch_upgrader::upgrade_impl (nano::raw_key const & prv_a, nano::epoc upgrader_condition.wait (lock); } } - node.workers.push_task ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account] () { + node.workers.post ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account] () { upgrader_process (upgraded_accounts, epoch, difficulty, signer, root, account); { nano::lock_guard lock{ upgrader_mutex }; @@ -241,7 +241,7 @@ void nano::epoch_upgrader::upgrade_impl (nano::raw_key const & prv_a, nano::epoc upgrader_condition.wait (lock); } } - node.workers.push_task ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account] () { + node.workers.post ([&upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account] () { upgrader_process (upgraded_pending, epoch, difficulty, signer, root, account); { nano::lock_guard lock{ upgrader_mutex }; diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 57b21696..7bdcb576 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -9,6 +9,7 @@ namespace nano { class block; class container_info; +class thread_pool; } namespace nano diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 1091f382..288296d0 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -541,7 +541,7 @@ void nano::json_handler::account_block_count () void nano::json_handler::account_create () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); if (!rpc_l->ec) { @@ -731,7 +731,7 @@ void nano::json_handler::account_list () void nano::json_handler::account_move () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); if (!rpc_l->ec) { @@ -770,7 +770,7 @@ void nano::json_handler::account_move () void nano::json_handler::account_remove () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); auto account (rpc_l->account_impl ()); if (!rpc_l->ec) @@ -805,7 +805,7 @@ void nano::json_handler::account_representative () void nano::json_handler::account_representative_set () { - node.workers.push_task (create_worker_task ([work_generation_enabled = node.work_generation_enabled ()] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([work_generation_enabled = node.work_generation_enabled ()] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); auto account (rpc_l->account_impl ()); std::string representative_text (rpc_l->request.get ("representative")); @@ -948,7 +948,7 @@ void nano::json_handler::accounts_representatives () void nano::json_handler::accounts_create () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); auto count (rpc_l->count_impl ()); if (!rpc_l->ec) @@ -2930,7 +2930,7 @@ void nano::json_handler::node_id_delete () void nano::json_handler::password_change () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); if (!rpc_l->ec) { @@ -2953,7 +2953,7 @@ void nano::json_handler::password_change () void nano::json_handler::password_enter () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); if (!rpc_l->ec) { @@ -3178,7 +3178,7 @@ void nano::json_handler::receivable_exists () void nano::json_handler::process () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { bool const is_async = rpc_l->request.get ("async", false); auto block (rpc_l->block_impl (true)); @@ -4143,7 +4143,7 @@ void nano::json_handler::unchecked () void nano::json_handler::unchecked_clear () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { rpc_l->node.unchecked.clear (); rpc_l->response_l.put ("success", ""); rpc_l->response_errors (); @@ -4316,7 +4316,7 @@ void nano::json_handler::validate_account_number () void nano::json_handler::wallet_add () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); if (!rpc_l->ec) { @@ -4346,7 +4346,7 @@ void nano::json_handler::wallet_add () void nano::json_handler::wallet_add_watch () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); if (!rpc_l->ec) { @@ -4469,7 +4469,7 @@ void nano::json_handler::wallet_balances () void nano::json_handler::wallet_change_seed () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); if (!rpc_l->ec) { @@ -4517,7 +4517,7 @@ void nano::json_handler::wallet_contains () void nano::json_handler::wallet_create () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { nano::raw_key seed; auto seed_text (rpc_l->request.get_optional ("seed")); if (seed_text.is_initialized () && seed.decode_hex (seed_text.get ())) @@ -4553,7 +4553,7 @@ void nano::json_handler::wallet_create () void nano::json_handler::wallet_destroy () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { std::string wallet_text (rpc_l->request.get ("wallet")); nano::wallet_id wallet; if (!wallet.decode_hex (wallet_text)) @@ -4845,7 +4845,7 @@ void nano::json_handler::wallet_representative () void nano::json_handler::wallet_representative_set () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); std::string representative_text (rpc_l->request.get ("representative")); auto representative (rpc_l->account_impl (representative_text, nano::error_rpc::bad_representative_number)); @@ -5132,7 +5132,7 @@ void nano::json_handler::work_get () void nano::json_handler::work_set () { - node.workers.push_task (create_worker_task ([] (std::shared_ptr const & rpc_l) { + node.workers.post (create_worker_task ([] (std::shared_ptr const & rpc_l) { auto wallet (rpc_l->wallet_impl ()); auto account (rpc_l->account_impl ()); auto work (rpc_l->work_optional_impl ()); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 269edcd8..b8f95f47 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -309,7 +309,7 @@ void nano::network::flood_block_many (std::deque> b if (!blocks_a.empty ()) { std::weak_ptr node_w (node.shared ()); - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a] () { + node.workers.post_delayed (std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a] () { if (auto node_l = node_w.lock ()) { node_l->network.flood_block_many (std::move (blocks), callback_a, delay_a); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 41cdfae1..42985173 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -69,6 +70,7 @@ nano::node::node (std::shared_ptr io_ctx_a, uint16_t pe nano::node::node (std::shared_ptr io_ctx_a, std::filesystem::path const & application_path_a, nano::node_config const & config_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) : node_id{ load_or_create_node_id (application_path_a) }, config{ config_a }, + flags{ flags_a }, io_ctx_shared{ std::make_shared () }, io_ctx{ *io_ctx_shared }, logger{ make_logger_identifier (node_id) }, @@ -77,11 +79,14 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy node_initialized_latch (1), network_params{ config.network_params }, stats{ logger, config.stats_config }, - workers{ config.background_threads, nano::thread_role::name::worker }, - bootstrap_workers{ config.bootstrap_serving_threads, nano::thread_role::name::bootstrap_worker }, - wallet_workers{ 1, nano::thread_role::name::wallet_worker }, - election_workers{ 1, nano::thread_role::name::election_worker }, - flags (flags_a), + workers_impl{ std::make_unique (config.background_threads, nano::thread_role::name::worker, /* start immediately */ true) }, + workers{ *workers_impl }, + bootstrap_workers_impl{ std::make_unique (config.bootstrap_serving_threads, nano::thread_role::name::bootstrap_worker, /* start immediately */ true) }, + bootstrap_workers{ *bootstrap_workers_impl }, + wallet_workers_impl{ std::make_unique (1, nano::thread_role::name::wallet_worker, /* start immediately */ true) }, + wallet_workers{ *wallet_workers_impl }, + election_workers_impl{ std::make_unique (1, nano::thread_role::name::election_worker, /* start immediately */ true) }, + election_workers{ *election_workers_impl }, work (work_a), distributed_work (*this), store_impl (nano::make_store (logger, application_path_a, network_params.ledger, flags.read_only, true, config_a.rocksdb_config, config_a.diagnostics_config.txn_tracking, config_a.block_processor_batch_max_time, config_a.lmdb_config, config_a.backup_before_upgrade)), @@ -402,7 +407,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy // TODO: Is it neccessary to call this for all blocks? if (block->is_send ()) { - wallet_workers.push_task ([this, hash = block->hash (), destination = block->destination ()] () { + wallet_workers.post ([this, hash = block->hash (), destination = block->destination ()] () { wallets.receive_confirmed (hash, destination); }); } @@ -564,7 +569,7 @@ void nano::node::start () if (flags.enable_pruning) { auto this_l (shared ()); - workers.push_task ([this_l] () { + workers.post ([this_l] () { this_l->ongoing_ledger_pruning (); }); } @@ -605,7 +610,7 @@ void nano::node::start () { // Delay to start wallet lazy bootstrap auto this_l (shared ()); - workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::minutes (1), [this_l] () { + workers.post_delayed (std::chrono::minutes (1), [this_l] () { this_l->bootstrap_wallet (); }); } @@ -654,9 +659,7 @@ void nano::node::stop () logger.info (nano::log::type::node, "Node stopping..."); tcp_listener.stop (); - bootstrap_workers.stop (); - wallet_workers.stop (); - election_workers.stop (); + vote_router.stop (); peer_history.stop (); // Cancels ongoing work generation tasks, which may be blocking other threads @@ -684,12 +687,16 @@ void nano::node::stop () wallets.stop (); stats.stop (); epoch_upgrader.stop (); - workers.stop (); local_block_broadcaster.stop (); message_processor.stop (); network.stop (); // Stop network last to avoid killing in-use sockets monitor.stop (); + bootstrap_workers.stop (); + wallet_workers.stop (); + election_workers.stop (); + workers.stop (); + // work pool is not stopped on purpose due to testing setup // Stop the IO runner last @@ -823,7 +830,7 @@ void nano::node::ongoing_bootstrap () // Bootstrap and schedule for next attempt bootstrap_initiator.bootstrap (false, boost::str (boost::format ("auto_bootstrap_%1%") % previous_bootstrap_count), frontiers_age); std::weak_ptr node_w (shared_from_this ()); - workers.add_timed_task (std::chrono::steady_clock::now () + next_wakeup, [node_w] () { + workers.post_delayed (next_wakeup, [node_w] () { if (auto node_l = node_w.lock ()) { node_l->ongoing_bootstrap (); @@ -844,7 +851,7 @@ void nano::node::backup_wallet () i->second->store.write_backup (transaction, backup_path / (i->first.to_string () + ".json")); } auto this_l (shared ()); - workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.backup_interval, [this_l] () { + workers.post_delayed (network_params.node.backup_interval, [this_l] () { this_l->backup_wallet (); }); } @@ -856,7 +863,7 @@ void nano::node::search_receivable_all () // Search pending wallets.search_receivable_all (); auto this_l (shared ()); - workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.search_pending_interval, [this_l] () { + workers.post_delayed (network_params.node.search_pending_interval, [this_l] () { this_l->search_receivable_all (); }); } @@ -981,8 +988,8 @@ void nano::node::ongoing_ledger_pruning () ledger_pruning (flags.block_processor_batch_size != 0 ? flags.block_processor_batch_size : 2 * 1024, bootstrap_weight_reached); auto const ledger_pruning_interval (bootstrap_weight_reached ? config.max_pruning_age : std::min (config.max_pruning_age, std::chrono::seconds (15 * 60))); auto this_l (shared ()); - workers.add_timed_task (std::chrono::steady_clock::now () + ledger_pruning_interval, [this_l] () { - this_l->workers.push_task ([this_l] () { + workers.post_delayed (ledger_pruning_interval, [this_l] () { + this_l->workers.post ([this_l] () { this_l->ongoing_ledger_pruning (); }); }); @@ -1126,7 +1133,7 @@ bool nano::node::block_confirmed_or_being_confirmed (nano::block_hash const & ha void nano::node::ongoing_online_weight_calculation_queue () { std::weak_ptr node_w (shared_from_this ()); - workers.add_timed_task (std::chrono::steady_clock::now () + (std::chrono::seconds (network_params.node.weight_period)), [node_w] () { + workers.post_delayed ((std::chrono::seconds (network_params.node.weight_period)), [node_w] () { if (auto node_l = node_w.lock ()) { node_l->ongoing_online_weight_calculation (); @@ -1165,7 +1172,7 @@ void nano::node::process_confirmed (nano::block_hash hash, std::shared_ptr #include #include -#include #include #include #include @@ -148,6 +147,7 @@ public: public: const nano::keypair node_id; nano::node_config config; + nano::node_flags flags; std::shared_ptr io_ctx_shared; boost::asio::io_context & io_ctx; nano::logger logger; @@ -156,11 +156,14 @@ public: boost::latch node_initialized_latch; nano::network_params & network_params; nano::stats stats; - nano::thread_pool workers; - nano::thread_pool bootstrap_workers; - nano::thread_pool wallet_workers; - nano::thread_pool election_workers; - nano::node_flags flags; + std::unique_ptr workers_impl; + nano::thread_pool & workers; + std::unique_ptr bootstrap_workers_impl; + nano::thread_pool & bootstrap_workers; + std::unique_ptr wallet_workers_impl; + nano::thread_pool & wallet_workers; + std::unique_ptr election_workers_impl; + nano::thread_pool & election_workers; nano::work_pool & work; nano::distributed_work_factory distributed_work; std::unique_ptr store_impl; diff --git a/nano/node/transport/tcp_server.cpp b/nano/node/transport/tcp_server.cpp index a4e59e67..881da00c 100644 --- a/nano/node/transport/tcp_server.cpp +++ b/nano/node/transport/tcp_server.cpp @@ -526,7 +526,7 @@ void nano::transport::tcp_server::bootstrap_message_visitor::bulk_pull (const na return; } - node->bootstrap_workers.push_task ([server = server, message = message] () { + node->bootstrap_workers.post ([server = server, message = message] () { // TODO: Add completion callback to bulk pull server // TODO: There should be no need to re-copy message as unique pointer, refactor those bulk/frontier pull/push servers auto bulk_pull_server = std::make_shared (server, std::make_unique (message)); @@ -548,7 +548,7 @@ void nano::transport::tcp_server::bootstrap_message_visitor::bulk_pull_account ( return; } - node->bootstrap_workers.push_task ([server = server, message = message] () { + node->bootstrap_workers.post ([server = server, message = message] () { // TODO: Add completion callback to bulk pull server // TODO: There should be no need to re-copy message as unique pointer, refactor those bulk/frontier pull/push servers auto bulk_pull_account_server = std::make_shared (server, std::make_unique (message)); @@ -565,7 +565,7 @@ void nano::transport::tcp_server::bootstrap_message_visitor::bulk_push (const na { return; } - node->bootstrap_workers.push_task ([server = server] () { + node->bootstrap_workers.post ([server = server] () { // TODO: Add completion callback to bulk pull server auto bulk_push_server = std::make_shared (server); bulk_push_server->throttled_receive (); @@ -582,7 +582,7 @@ void nano::transport::tcp_server::bootstrap_message_visitor::frontier_req (const return; } - node->bootstrap_workers.push_task ([server = server, message = message] () { + node->bootstrap_workers.post ([server = server, message = message] () { // TODO: There should be no need to re-copy message as unique pointer, refactor those bulk/frontier pull/push servers auto response = std::make_shared (server, std::make_unique (message)); response->send_next (); diff --git a/nano/node/transport/tcp_socket.cpp b/nano/node/transport/tcp_socket.cpp index 0c0cad10..e85ee7a8 100644 --- a/nano/node/transport/tcp_socket.cpp +++ b/nano/node/transport/tcp_socket.cpp @@ -274,7 +274,7 @@ void nano::transport::tcp_socket::ongoing_checkup () return; } - node_l->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 5), [this_w = weak_from_this ()] () { + node_l->workers.post_delayed (std::chrono::seconds (node_l->network_params.network.is_dev_network () ? 1 : 5), [this_w = weak_from_this ()] () { auto this_l = this_w.lock (); if (!this_l) { diff --git a/nano/node/wallet.cpp b/nano/node/wallet.cpp index 387c479e..18dd7df7 100644 --- a/nano/node/wallet.cpp +++ b/nano/node/wallet.cpp @@ -1162,7 +1162,7 @@ void nano::wallet::work_ensure (nano::account const & account_a, nano::root cons wallets.delayed_work->operator[] (account_a) = root_a; - wallets.node.workers.add_timed_task (std::chrono::steady_clock::now () + precache_delay, [this_l = shared_from_this (), account_a, root_a] { + wallets.node.workers.post_delayed (precache_delay, [this_l = shared_from_this (), account_a, root_a] { auto delayed_work = this_l->wallets.delayed_work.lock (); auto existing (delayed_work->find (account_a)); if (existing != delayed_work->end () && existing->second == root_a) @@ -1713,7 +1713,7 @@ void nano::wallets::ongoing_compute_reps () auto & node_l (node); // Representation drifts quickly on the test network but very slowly on the live network auto compute_delay = network_params.network.is_dev_network () ? std::chrono::milliseconds (10) : (network_params.network.is_test_network () ? std::chrono::milliseconds (nano::test_scan_wallet_reps_delay ()) : std::chrono::minutes (15)); - node.workers.add_timed_task (std::chrono::steady_clock::now () + compute_delay, [&node_l] () { + node.workers.post_delayed (compute_delay, [&node_l] () { node_l.wallets.ongoing_compute_reps (); }); } diff --git a/nano/qt/qt.cpp b/nano/qt/qt.cpp index 7a513e8b..8f24aeb7 100644 --- a/nano/qt/qt.cpp +++ b/nano/qt/qt.cpp @@ -107,7 +107,7 @@ nano_qt::self_pane::self_pane (nano_qt::wallet & wallet_a, nano::account const & QObject::connect (copy_button, &QPushButton::clicked, [this] () { this->wallet.application.clipboard ()->setText (QString (this->wallet.account.to_account ().c_str ())); copy_button->setText ("Copied!"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (2), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (2), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { copy_button->setText ("Copy"); })); @@ -201,7 +201,7 @@ nano_qt::accounts::accounts (nano_qt::wallet & wallet_a) : this->wallet.wallet_m->deterministic_insert (transaction); show_button_success (*create_account); create_account->setText ("New account was created"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*create_account); create_account->setText ("Create account"); @@ -212,7 +212,7 @@ nano_qt::accounts::accounts (nano_qt::wallet & wallet_a) : { show_button_error (*create_account); create_account->setText ("Wallet is locked, unlock it to create account"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*create_account); create_account->setText ("Create account"); @@ -234,7 +234,7 @@ nano_qt::accounts::accounts (nano_qt::wallet & wallet_a) : this->wallet.application.clipboard ()->setText (QString (seed.to_string ().c_str ())); show_button_success (*backup_seed); backup_seed->setText ("Seed was copied to clipboard"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*backup_seed); backup_seed->setText ("Copy wallet seed to clipboard"); @@ -246,7 +246,7 @@ nano_qt::accounts::accounts (nano_qt::wallet & wallet_a) : this->wallet.application.clipboard ()->setText (""); show_button_error (*backup_seed); backup_seed->setText ("Wallet is locked, unlock it to enable the backup"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*backup_seed); backup_seed->setText ("Copy wallet seed to clipboard"); @@ -280,7 +280,7 @@ void nano_qt::accounts::refresh_wallet_balance () final_text += "\nReady to receive: " + wallet.format_balance (pending); } wallet_balance_label->setText (QString (final_text.c_str ())); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (60), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (60), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { refresh_wallet_balance (); })); @@ -410,7 +410,7 @@ nano_qt::import::import (nano_qt::wallet & wallet_a) : show_line_error (*seed); show_button_error (*import_seed); import_seed->setText ("Wallet is locked, unlock it to enable the import"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (10), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (10), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_line_ok (*seed); show_button_ok (*import_seed); @@ -427,7 +427,7 @@ nano_qt::import::import (nano_qt::wallet & wallet_a) : show_button_success (*import_seed); import_seed->setText ("Successful import of seed"); this->wallet.refresh (); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*import_seed); import_seed->setText ("Import seed"); @@ -447,7 +447,7 @@ nano_qt::import::import (nano_qt::wallet & wallet_a) : { import_seed->setText ("Incorrect seed. Only HEX characters allowed"); } - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*import_seed); import_seed->setText ("Import seed"); @@ -460,7 +460,7 @@ nano_qt::import::import (nano_qt::wallet & wallet_a) : show_line_error (*clear_line); show_button_error (*import_seed); import_seed->setText ("Type words 'clear keys'"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*import_seed); import_seed->setText ("Import seed"); @@ -745,7 +745,7 @@ void nano_qt::block_viewer::rebroadcast_action (nano::block_hash const & hash_a) if (successor) { done = false; - wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this, successor] () { + wallet.node.workers.post_delayed (std::chrono::seconds (1), [this, successor] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this, successor] () { rebroadcast_action (successor.value ()); })); @@ -1147,7 +1147,7 @@ void nano_qt::wallet::ongoing_refresh () } })); - node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [wallet_w] () { + node.workers.post_delayed (std::chrono::seconds (5), [wallet_w] () { if (auto wallet_l = wallet_w.lock ()) { wallet_l->ongoing_refresh (); @@ -1231,7 +1231,7 @@ void nano_qt::wallet::start () { show_button_error (*this_l->send_blocks_send); this_l->send_blocks_send->setText ("Wallet is locked, unlock it to send"); - this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w] () { + this_l->node.workers.post_delayed (std::chrono::seconds (5), [this_w] () { if (auto this_l = this_w.lock ()) { this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w] () { @@ -1250,7 +1250,7 @@ void nano_qt::wallet::start () show_line_error (*this_l->send_count); show_button_error (*this_l->send_blocks_send); this_l->send_blocks_send->setText ("Not enough balance"); - this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w] () { + this_l->node.workers.post_delayed (std::chrono::seconds (5), [this_w] () { if (auto this_l = this_w.lock ()) { this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w] () { @@ -1269,7 +1269,7 @@ void nano_qt::wallet::start () show_line_error (*this_l->send_account); show_button_error (*this_l->send_blocks_send); this_l->send_blocks_send->setText ("Bad destination account"); - this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w] () { + this_l->node.workers.post_delayed (std::chrono::seconds (5), [this_w] () { if (auto this_l = this_w.lock ()) { this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w] () { @@ -1288,7 +1288,7 @@ void nano_qt::wallet::start () show_line_error (*this_l->send_count); show_button_error (*this_l->send_blocks_send); this_l->send_blocks_send->setText ("Bad amount number"); - this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w] () { + this_l->node.workers.post_delayed (std::chrono::seconds (5), [this_w] () { if (auto this_l = this_w.lock ()) { this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w] () { @@ -1464,7 +1464,7 @@ void nano_qt::wallet::update_connected () void nano_qt::wallet::empty_password () { - this->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [this] () { + this->node.workers.post_delayed (std::chrono::seconds (3), [this] () { auto transaction (wallet_m->wallets.tx_begin_write ()); wallet_m->enter_password (transaction, std::string ("")); }); @@ -1568,7 +1568,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) : change->setText ("Password was changed"); this->wallet.node.logger.warn (nano::log::type::qt, "Wallet password changed"); update_locked (false, false); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*change); change->setText ("Set/Change password"); @@ -1586,7 +1586,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) : { show_button_error (*change); change->setText ("Wallet is locked, unlock it"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*change); change->setText ("Set/Change password"); @@ -1612,7 +1612,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) : change_rep->setText ("Representative was changed"); current_representative->setText (QString (representative_l.to_account ().c_str ())); new_representative->clear (); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*change_rep); change_rep->setText ("Change representative"); @@ -1623,7 +1623,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) : { show_button_error (*change_rep); change_rep->setText ("Wallet is locked, unlock it"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_button_ok (*change_rep); change_rep->setText ("Change representative"); @@ -1636,7 +1636,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) : show_line_error (*new_representative); show_button_error (*change_rep); change_rep->setText ("Invalid account"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_line_ok (*new_representative); show_button_ok (*change_rep); @@ -1676,7 +1676,7 @@ nano_qt::settings::settings (nano_qt::wallet & wallet_a) : show_line_error (*password); show_button_error (*lock_toggle); lock_toggle->setText ("Invalid password"); - this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this] () { + this->wallet.node.workers.post_delayed (std::chrono::seconds (5), [this] () { this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this] () { show_line_ok (*password); show_button_ok (*lock_toggle); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 206a5509..ce050df0 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -68,7 +68,7 @@ TEST (rpc, wrapped_task) // Exception should get caught throw std::runtime_error (""); })); - system.nodes[0]->workers.push_task (task); + system.nodes[0]->workers.post (task); ASSERT_TIMELY_EQ (5s, response, true); } diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index c49f4d84..b9c3731f 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -104,7 +104,7 @@ TEST (system, receive_while_synchronizing) node1->start (); system.nodes.push_back (node1); ASSERT_NE (nullptr, nano::test::establish_tcp (system, *node1, node->network.endpoint ())); - node1->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (200), ([&system, &key] () { + node1->workers.post_delayed (std::chrono::milliseconds (200), ([&system, &key] () { auto hash (system.wallet (0)->send_sync (nano::dev::genesis_key.pub, key.pub, system.nodes[0]->config.receive_minimum.number ())); auto transaction = system.nodes[0]->ledger.tx_begin_read (); auto block = system.nodes[0]->ledger.any.block_get (transaction, hash); diff --git a/nano/test_common/system.cpp b/nano/test_common/system.cpp index 15aed961..758a5c72 100644 --- a/nano/test_common/system.cpp +++ b/nano/test_common/system.cpp @@ -406,7 +406,7 @@ public: if (count_l > 0) { auto this_l (shared_from_this ()); - node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (wait), [this_l] () { this_l->run (); }); + node->workers.post_delayed (std::chrono::milliseconds (wait), [this_l] () { this_l->run (); }); } } std::vector accounts;