Combine alarm and worker threads into a pool (#2871)

* Combine alarm and worker threads into a pool

* Change delayed_task to timed_task

* Update thread_pool with comments

* Fix lock tests

* Fix failing vote_spacing tests

* Increase worker thread count
This commit is contained in:
Wesley Shillingford 2021-02-01 15:39:36 +00:00 committed by GitHub
commit a3f1551543
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
47 changed files with 401 additions and 594 deletions

View file

@ -0,0 +1,7 @@
#pragma once
#include <nano/boost/private/macro_warnings.hpp>
DISABLE_ASIO_WARNINGS
#include <boost/asio/steady_timer.hpp>
REENABLE_WARNINGS

View file

@ -173,7 +173,7 @@ TEST (bulk_pull, count_limit)
TEST (bootstrap_processor, DISABLED_process_none)
{
nano::system system (1);
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node1->init_error ());
auto done (false);
node1->bootstrap_initiator.bootstrap (system.nodes[0]->network.endpoint ());
@ -199,7 +199,7 @@ TEST (bootstrap_processor, process_one)
node_config.peering_port = nano::get_available_port ();
node_flags.disable_rep_crawler = true;
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, node_config, system.work, node_flags));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), node_config, system.work, node_flags));
nano::block_hash hash1 (node0->latest (nano::dev_genesis_key.pub));
nano::block_hash hash2 (node1->latest (nano::dev_genesis_key.pub));
ASSERT_NE (hash1, hash2);
@ -227,7 +227,7 @@ TEST (bootstrap_processor, process_two)
ASSERT_NE (hash1, hash2);
ASSERT_NE (hash1, hash3);
ASSERT_NE (hash2, hash3);
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node1->init_error ());
node1->bootstrap_initiator.bootstrap (node0->network.endpoint ());
ASSERT_NE (node1->latest (nano::dev_genesis_key.pub), node0->latest (nano::dev_genesis_key.pub));
@ -252,7 +252,7 @@ TEST (bootstrap_processor, process_state)
node0->work_generate_blocking (*block2);
node0->process (*block1);
node0->process (*block2);
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_EQ (node0->latest (nano::dev_genesis_key.pub), block2->hash ());
ASSERT_NE (node1->latest (nano::dev_genesis_key.pub), block2->hash ());
node1->bootstrap_initiator.bootstrap (node0->network.endpoint ());
@ -279,7 +279,7 @@ TEST (bootstrap_processor, process_new)
ASSERT_TIMELY (10s, !node1->balance (key2.pub).is_zero ());
nano::uint128_t balance1 (node1->balance (nano::dev_genesis_key.pub));
nano::uint128_t balance2 (node1->balance (key2.pub));
auto node3 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node3 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node3->init_error ());
node3->bootstrap_initiator.bootstrap (node1->network.endpoint ());
ASSERT_TIMELY (10s, node3->balance (key2.pub) == balance2);
@ -304,7 +304,7 @@ TEST (bootstrap_processor, pull_diamond)
ASSERT_EQ (nano::process_result::progress, node0->process (*send2).code);
auto receive (std::make_shared<nano::receive_block> (send1->hash (), send2->hash (), nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (send1->hash ())));
ASSERT_EQ (nano::process_result::progress, node0->process (*receive).code);
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node1->init_error ());
node1->bootstrap_initiator.bootstrap (node0->network.endpoint ());
ASSERT_TIMELY (10s, node1->balance (nano::dev_genesis_key.pub) == 100);
@ -383,7 +383,7 @@ TEST (bootstrap_processor, frontiers_unconfirmed)
// Ensure node2 can generate votes
node2->block_confirm (send3);
ASSERT_TIMELY (5s, node2->ledger.cache.cemented_count == 3);
ASSERT_TIMELY (10s, node2->ledger.cache.cemented_count == 3);
// Test node to restart bootstrap
node_config.peering_port = nano::get_available_port ();
@ -430,7 +430,7 @@ TEST (bootstrap_processor, frontiers_confirmed)
// Confirm all blocks so node1 is free to generate votes
node1->block_confirm (send1);
ASSERT_TIMELY (5s, node1->ledger.cache.cemented_count == 5);
ASSERT_TIMELY (10s, node1->ledger.cache.cemented_count == 5);
// Test node to bootstrap
node_config.peering_port = nano::get_available_port ();
@ -502,7 +502,7 @@ TEST (bootstrap_processor, push_diamond)
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node0 (system.add_node (config));
nano::keypair key;
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node1->init_error ());
auto wallet1 (node1->wallets.create (100));
wallet1->insert_adhoc (nano::dev_genesis_key.prv);
@ -532,7 +532,7 @@ TEST (bootstrap_processor, push_diamond_pruning)
config.enable_voting = false; // Remove after allowing pruned voting
nano::node_flags node_flags;
node_flags.enable_pruning = true;
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags, 1));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), config, system.work, node_flags, 1));
ASSERT_FALSE (node1->init_error ());
auto latest (node0->latest (nano::dev_genesis_key.pub));
auto send1 (std::make_shared<nano::send_block> (latest, key.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (latest)));
@ -575,7 +575,7 @@ TEST (bootstrap_processor, push_one)
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node0 (system.add_node (config));
nano::keypair key1;
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
auto wallet (node1->wallets.create (nano::random_wallet_id ()));
ASSERT_NE (nullptr, wallet);
wallet->insert_adhoc (nano::dev_genesis_key.prv);
@ -610,7 +610,7 @@ TEST (bootstrap_processor, lazy_hash)
node0->block_processor.add (receive2);
node0->block_processor.flush ();
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true);
{
@ -646,7 +646,7 @@ TEST (bootstrap_processor, lazy_hash_bootstrap_id)
node0->block_processor.add (receive2);
node0->block_processor.flush ();
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true, true, "123456");
{
@ -694,7 +694,7 @@ TEST (bootstrap_processor, lazy_hash_pruning)
ASSERT_EQ (9, node0->ledger.cache.block_count);
// Processing chain to prune for node1
config.peering_port = nano::get_available_port ();
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags, 1));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), config, system.work, node_flags, 1));
node1->process_active (send1);
node1->process_active (receive1);
node1->process_active (change1);
@ -771,7 +771,7 @@ TEST (bootstrap_processor, lazy_max_pull_count)
node0->block_processor.add (change3);
node0->block_processor.flush ();
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (change3->hash ());
// Check processed blocks
@ -938,7 +938,7 @@ TEST (bootstrap_processor, lazy_pruning_missing_block)
ASSERT_TRUE (node1->ledger.block_exists (state_open->hash ()));
// Start lazy bootstrap with last block in sender chain
config.peering_port = nano::get_available_port ();
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, config, system.work, node_flags, 1));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), config, system.work, node_flags, 1));
node2->network.udp_channels.insert (node1->network.endpoint (), node1->network_params.protocol.protocol_version);
node2->bootstrap_initiator.bootstrap_lazy (send2->hash ());
// Check processed blocks
@ -982,7 +982,7 @@ TEST (bootstrap_processor, lazy_cancel)
// Generating test chain
auto send1 (std::make_shared<nano::state_block> (nano::dev_genesis_key.pub, genesis.hash (), nano::dev_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key1.pub, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *node0->work_generate_blocking (genesis.hash ())));
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (send1->hash (), true); // Start "confirmed" block bootstrap
{
@ -1019,7 +1019,7 @@ TEST (bootstrap_processor, wallet_lazy_frontier)
node0->block_processor.add (receive2);
node0->block_processor.flush ();
// Start wallet lazy bootstrap
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
auto wallet (node1->wallets.create (nano::random_wallet_id ()));
ASSERT_NE (nullptr, wallet);
@ -1057,7 +1057,7 @@ TEST (bootstrap_processor, wallet_lazy_pending)
node0->block_processor.add (send2);
node0->block_processor.flush ();
// Start wallet lazy bootstrap
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
auto wallet (node1->wallets.create (nano::random_wallet_id ()));
ASSERT_NE (nullptr, wallet);
@ -1093,7 +1093,7 @@ TEST (bootstrap_processor, multiple_attempts)
// Start 2 concurrent bootstrap attempts
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.bootstrap_initiator_threads = 3;
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, node_config, system.work));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), node_config, system.work));
node2->network.udp_channels.insert (node1->network.endpoint (), node2->network_params.protocol.protocol_version);
node2->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true);
node2->bootstrap_initiator.bootstrap ();
@ -1277,7 +1277,7 @@ TEST (bulk, genesis)
node_flags.disable_lazy_bootstrap = true;
auto node1 = system.add_node (config, node_flags);
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node2->init_error ());
nano::block_hash latest1 (node1->latest (nano::dev_genesis_key.pub));
nano::block_hash latest2 (node2->latest (nano::dev_genesis_key.pub));
@ -1302,7 +1302,7 @@ TEST (bulk, offline_send)
node_flags.disable_lazy_bootstrap = true;
auto node1 = system.add_node (config, node_flags);
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node2->init_error ());
node2->start ();
system.nodes.push_back (node2);
@ -1342,7 +1342,7 @@ TEST (bulk, genesis_pruning)
auto node1 = system.add_node (config, node_flags);
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
node_flags.enable_pruning = false;
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work, node_flags));
ASSERT_FALSE (node2->init_error ());
nano::block_hash latest1 (node1->latest (nano::dev_genesis_key.pub));
nano::block_hash latest2 (node2->latest (nano::dev_genesis_key.pub));

View file

@ -69,8 +69,12 @@ TEST (locks, lock_guard)
t.join ();
}
// 2 mutexes held and 1 blocked
// 2 mutexes held and 1 blocked (if defined)
#if NANO_TIMED_LOCKS_IGNORE_BLOCKED
ASSERT_EQ (num_matches (ss.str ()), 2);
#else
ASSERT_EQ (num_matches (ss.str ()), 3);
#endif
}
TEST (locks, unique_lock)
@ -103,8 +107,12 @@ TEST (locks, unique_lock)
t.join ();
}
// 3 mutexes held and 1 blocked
// 3 mutexes held and 1 blocked (if defined)
#if NANO_TIMED_LOCKS_IGNORE_BLOCKED
ASSERT_EQ (num_matches (ss.str ()), 3);
#else
ASSERT_EQ (num_matches (ss.str ()), 4);
#endif
}
TEST (locks, condition_variable_wait)

View file

@ -80,7 +80,7 @@ TEST (network, send_node_id_handshake)
nano::system system;
auto node0 = system.add_node (node_flags);
ASSERT_EQ (0, node0->network.size ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);
auto initial (node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in));
@ -105,7 +105,7 @@ TEST (network, send_node_id_handshake_tcp)
nano::system system (1);
auto node0 (system.nodes[0]);
ASSERT_EQ (0, node0->network.size ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->start ();
system.nodes.push_back (node1);
auto initial (node0->stats.count (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in));
@ -135,7 +135,7 @@ TEST (network, last_contacted)
nano::system system (1);
auto node0 = system.nodes[0];
ASSERT_EQ (0, node0->network.size ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->start ();
system.nodes.push_back (node1);
auto channel1 = nano::establish_tcp (system, *node1, node0->network.endpoint ());
@ -158,7 +158,7 @@ TEST (network, multi_keepalive)
nano::system system (1);
auto node0 = system.nodes[0];
ASSERT_EQ (0, node0->network.size ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node1->init_error ());
node1->start ();
system.nodes.push_back (node1);
@ -166,7 +166,7 @@ TEST (network, multi_keepalive)
ASSERT_EQ (0, node0->network.size ());
node1->network.tcp_channels.start_tcp (node0->network.endpoint (), nano::keepalive_tcp_callback (*node1));
ASSERT_TIMELY (10s, node0->network.size () == 1 && node0->stats.count (nano::stat::type::message, nano::stat::detail::keepalive) >= 1);
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node2->init_error ());
node2->start ();
system.nodes.push_back (node2);
@ -812,9 +812,7 @@ TEST (network, replace_port)
node0_config.io_threads = 8;
auto node0 = system.add_node (node0_config, node_flags);
ASSERT_EQ (0, node0->network.size ());
nano::node_config node1_config (nano::get_available_port (), system.logging);
node1_config.io_threads = 1;
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, node1_config, system.work, node_flags));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work, node_flags));
node1->start ();
system.nodes.push_back (node1);
auto wrong_endpoint = nano::endpoint (node1->network.endpoint ().address (), nano::get_available_port ());
@ -841,7 +839,7 @@ TEST (network, peer_max_tcp_attempts)
node_flags.disable_tcp_realtime = true;
for (auto i (0); i < node->network_params.node.max_peers_per_ip; ++i)
{
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work, node_flags));
node2->start ();
system.nodes.push_back (node2);
// Start TCP attempt
@ -895,7 +893,7 @@ TEST (network, duplicate_revert_publish)
nano::uint128_t digest;
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
auto other_node (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto other_node (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
other_node->start ();
system.nodes.push_back (other_node);
auto channel = nano::establish_tcp (system, *other_node, node.network.endpoint ());
@ -1000,7 +998,7 @@ TEST (network, tcp_no_connect_excluded_peers)
nano::system system (1);
auto node0 (system.nodes[0]);
ASSERT_EQ (0, node0->network.size ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->start ();
system.nodes.push_back (node1);
auto endpoint1 (node1->network.endpoint ());
@ -1102,7 +1100,7 @@ TEST (network, cleanup_purge)
nano::system system (1);
auto & node1 (*system.nodes[0]);
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node2->start ();
system.nodes.push_back (node2);

View file

@ -57,12 +57,11 @@ TEST (node, work_generate)
TEST (node, block_store_path_failure)
{
auto service (boost::make_shared<boost::asio::io_context> ());
nano::alarm alarm (*service);
auto path (nano::unique_path ());
nano::logging logging;
logging.init (path);
nano::work_pool work (std::numeric_limits<unsigned>::max ());
auto node (std::make_shared<nano::node> (*service, nano::get_available_port (), path, alarm, logging, work));
auto node (std::make_shared<nano::node> (*service, nano::get_available_port (), path, logging, work));
ASSERT_TRUE (node->wallets.items.empty ());
node->stop ();
}
@ -87,18 +86,17 @@ TEST (node_DeathTest, readonly_block_store_not_exist)
TEST (node, password_fanout)
{
auto service (boost::make_shared<boost::asio::io_context> ());
nano::alarm alarm (*service);
boost::asio::io_context io_ctx;
auto path (nano::unique_path ());
nano::node_config config;
config.peering_port = nano::get_available_port ();
config.logging.init (path);
nano::work_pool work (std::numeric_limits<unsigned>::max ());
config.password_fanout = 10;
auto node (std::make_shared<nano::node> (*service, path, alarm, config, work));
auto wallet (node->wallets.create (100));
nano::node node (io_ctx, path, config, work);
auto wallet (node.wallets.create (100));
ASSERT_EQ (10, wallet->store.password.values.size ());
node->stop ();
node.stop ();
}
TEST (node, balance)
@ -289,7 +287,7 @@ TEST (node, auto_bootstrap)
auto send1 (system.wallet (0)->send_action (nano::dev_genesis_key.pub, key2.pub, node0->config.receive_minimum.number ()));
ASSERT_NE (nullptr, send1);
ASSERT_TIMELY (10s, node0->balance (key2.pub) == node0->config.receive_minimum.number ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work, node_flags));
ASSERT_FALSE (node1->init_error ());
node1->start ();
system.nodes.push_back (node1);
@ -318,7 +316,7 @@ TEST (node, auto_bootstrap_reverse)
nano::keypair key2;
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
system.wallet (0)->insert_adhoc (key2.prv);
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work, node_flags));
ASSERT_FALSE (node1->init_error ());
ASSERT_NE (nullptr, system.wallet (0)->send_action (nano::dev_genesis_key.pub, key2.pub, node0->config.receive_minimum.number ()));
node1->start ();
@ -513,7 +511,7 @@ TEST (node, connect_after_junk)
nano::node_flags node_flags;
node_flags.disable_udp = false;
auto node0 = system.add_node (node_flags);
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work, node_flags));
std::vector<uint8_t> junk_buffer;
junk_buffer.push_back (0);
auto channel1 (std::make_shared<nano::transport::channel_udp> (node1->network.udp_channels, node0->network.endpoint (), node1->network_params.protocol.protocol_version));
@ -1001,7 +999,7 @@ TEST (node_flags, disable_udp)
nano::node_flags node_flags;
node_flags.disable_udp = false;
auto node1 = system.add_node (node_flags);
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, nano::node_config (nano::get_available_port (), system.logging), system.work));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), nano::node_config (nano::get_available_port (), system.logging), system.work));
system.nodes.push_back (node2);
node2->start ();
ASSERT_EQ (nano::endpoint (boost::asio::ip::address_v6::loopback (), 0), node2->network.udp_channels.get_local_endpoint ());
@ -2094,7 +2092,7 @@ TEST (node, rep_weight)
{
nano::system system;
auto add_node = [&system] {
auto node = std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work);
auto node = std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work);
node->start ();
system.nodes.push_back (node);
return node;
@ -2267,7 +2265,7 @@ TEST (node, rep_remove)
node.rep_crawler.response (channel1, vote2);
ASSERT_TIMELY (10s, node.rep_crawler.representative_count () == 1);
// Add inactive TCP representative channel
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, nano::node_config (nano::get_available_port (), system.logging), system.work));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), nano::node_config (nano::get_available_port (), system.logging), system.work));
std::weak_ptr<nano::node> node_w (node.shared ());
auto vote3 = std::make_shared<nano::vote> (keypair2.pub, keypair2.prv, 0, genesis.open);
node.network.tcp_channels.start_tcp (node2->network.endpoint (), [node_w, &vote3](std::shared_ptr<nano::transport::channel> const & channel2) {
@ -3507,7 +3505,7 @@ TEST (node, peers)
auto node1 (system.nodes[0]);
ASSERT_TRUE (node1->network.empty ());
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
system.nodes.push_back (node2);
auto endpoint = node1->network.endpoint ();
@ -3559,7 +3557,7 @@ TEST (node, peer_cache_restart)
nano::endpoint_key endpoint_key{ endpoint.address ().to_v6 ().to_bytes (), endpoint.port () };
auto path (nano::unique_path ());
{
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), path, system.alarm, system.logging, system.work));
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), path, system.logging, system.work));
system.nodes.push_back (node2);
auto & store = node2->store;
{
@ -3579,7 +3577,7 @@ TEST (node, peer_cache_restart)
{
nano::node_flags node_flags;
node_flags.read_only = true;
auto node3 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), path, system.alarm, system.logging, system.work, node_flags));
auto node3 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), path, system.logging, system.work, node_flags));
system.nodes.push_back (node3);
// Check cached peers after restart
node3->network.start ();

View file

@ -1,5 +1,3 @@
#include <nano/boost/asio/io_context.hpp>
#include <nano/lib/alarm.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/work.hpp>
#include <nano/secure/blockstore.hpp>
@ -9,9 +7,6 @@
#include <gtest/gtest.h>
#include <atomic>
#include <condition_variable>
#include <future>
TEST (processor_service, bad_send_signature)
{
@ -54,87 +49,3 @@ TEST (processor_service, bad_receive_signature)
receive.signature.bytes[32] ^= 0x1;
ASSERT_EQ (nano::process_result::bad_signature, ledger.process (transaction, receive).code);
}
TEST (alarm, one)
{
boost::asio::io_context io_ctx;
nano::alarm alarm (io_ctx);
std::atomic<bool> done (false);
std::mutex mutex;
nano::condition_variable condition;
alarm.add (std::chrono::steady_clock::now (), [&]() {
{
nano::lock_guard<std::mutex> lock (mutex);
done = true;
}
condition.notify_one ();
});
boost::asio::io_context::work work (io_ctx);
boost::thread thread ([&io_ctx]() { io_ctx.run (); });
nano::unique_lock<std::mutex> unique (mutex);
condition.wait (unique, [&]() { return !!done; });
io_ctx.stop ();
thread.join ();
}
TEST (alarm, many)
{
boost::asio::io_context io_ctx;
nano::alarm alarm (io_ctx);
std::atomic<int> count (0);
std::mutex mutex;
nano::condition_variable condition;
for (auto i (0); i < 50; ++i)
{
alarm.add (std::chrono::steady_clock::now (), [&]() {
{
nano::lock_guard<std::mutex> lock (mutex);
count += 1;
}
condition.notify_one ();
});
}
boost::asio::io_context::work work (io_ctx);
std::vector<boost::thread> threads;
for (auto i (0); i < 50; ++i)
{
threads.push_back (boost::thread ([&io_ctx]() { io_ctx.run (); }));
}
nano::unique_lock<std::mutex> unique (mutex);
condition.wait (unique, [&]() { return count == 50; });
io_ctx.stop ();
for (auto i (threads.begin ()), j (threads.end ()); i != j; ++i)
{
i->join ();
}
}
TEST (alarm, top_execution)
{
boost::asio::io_context io_ctx;
nano::alarm alarm (io_ctx);
int value1 (0);
int value2 (0);
std::mutex mutex;
std::promise<bool> promise;
alarm.add (std::chrono::steady_clock::now (), [&]() {
nano::lock_guard<std::mutex> lock (mutex);
value1 = 1;
value2 = 1;
});
alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [&]() {
nano::lock_guard<std::mutex> lock (mutex);
value2 = 2;
promise.set_value (false);
});
boost::asio::io_context::work work (io_ctx);
boost::thread thread ([&io_ctx]() {
io_ctx.run ();
});
promise.get_future ().get ();
nano::lock_guard<std::mutex> lock (mutex);
ASSERT_EQ (1, value1);
ASSERT_EQ (2, value2);
io_ctx.stop ();
thread.join ();
}

View file

@ -351,11 +351,11 @@ TEST (telemetry, blocking_request)
std::atomic<bool> done{ false };
std::function<void()> call_system_poll;
std::promise<void> promise;
call_system_poll = [&call_system_poll, &worker = node_client->worker, &done, &system, &promise]() {
call_system_poll = [&call_system_poll, &workers = node_client->workers, &done, &system, &promise]() {
if (!done)
{
ASSERT_NO_ERROR (system.poll ());
worker.push_task (call_system_poll);
workers.push_task (call_system_poll);
}
else
{
@ -363,9 +363,9 @@ TEST (telemetry, blocking_request)
}
};
// Keep pushing system.polls in another thread (worker), because we will be blocking this thread and unable to do so.
// Keep pushing system.polls in another thread (thread_pool), because we will be blocking this thread and unable to do so.
system.deadline_set (10s);
node_client->worker.push_task (call_system_poll);
node_client->workers.push_task (call_system_poll);
// Now try single request metric
auto telemetry_data_response = node_client->telemetry->get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ()));
@ -550,7 +550,7 @@ TEST (telemetry, remove_peer_different_genesis)
nano::system system (1);
auto node0 (system.nodes[0]);
ASSERT_EQ (0, node0->network.size ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
// Change genesis block to something else in this test (this is the reference telemetry processing uses).
// Possible TSAN issue in the future if something else uses this, but will only appear in tests.
node1->network_params.ledger.genesis_hash = nano::block_hash ("0");
@ -580,7 +580,7 @@ TEST (telemetry, remove_peer_different_genesis_udp)
nano::system system (1, nano::transport::transport_type::udp, node_flags);
auto node0 (system.nodes[0]);
ASSERT_EQ (0, node0->network.size ());
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work, node_flags));
node1->network_params.ledger.genesis_hash = nano::block_hash ("0");
node1->start ();
system.nodes.push_back (node1);

View file

@ -3,13 +3,14 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/timer.hpp>
#include <nano/lib/utility.hpp>
#include <nano/lib/worker.hpp>
#include <nano/secure/utility.hpp>
#include <gtest/gtest.h>
#include <boost/filesystem.hpp>
#include <future>
using namespace std::chrono_literals;
TEST (rate, basic)
@ -90,7 +91,7 @@ TEST (optional_ptr, basic)
ASSERT_EQ (opt->z, 3);
}
TEST (thread, worker)
TEST (thread, thread_pool)
{
std::atomic<bool> passed_sleep{ false };
@ -99,8 +100,8 @@ TEST (thread, worker)
passed_sleep = true;
};
nano::worker worker;
worker.push_task (func);
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
workers.push_task (func);
ASSERT_FALSE (passed_sleep);
nano::timer<std::chrono::milliseconds> timer_l;
@ -115,6 +116,66 @@ TEST (thread, worker)
ASSERT_TRUE (passed_sleep);
}
TEST (thread_pool_alarm, one)
{
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
std::atomic<bool> done (false);
std::mutex mutex;
nano::condition_variable condition;
workers.add_timed_task (std::chrono::steady_clock::now (), [&]() {
{
nano::lock_guard<std::mutex> lock (mutex);
done = true;
}
condition.notify_one ();
});
nano::unique_lock<std::mutex> unique (mutex);
condition.wait (unique, [&]() { return !!done; });
}
TEST (thread_pool_alarm, many)
{
nano::thread_pool workers (50u, nano::thread_role::name::unknown);
std::atomic<int> count (0);
std::mutex mutex;
nano::condition_variable condition;
for (auto i (0); i < 50; ++i)
{
workers.add_timed_task (std::chrono::steady_clock::now (), [&]() {
{
nano::lock_guard<std::mutex> lock (mutex);
count += 1;
}
condition.notify_one ();
});
}
nano::unique_lock<std::mutex> unique (mutex);
condition.wait (unique, [&]() { return count == 50; });
}
TEST (thread_pool_alarm, top_execution)
{
nano::thread_pool workers (1u, nano::thread_role::name::unknown);
int value1 (0);
int value2 (0);
std::mutex mutex;
std::promise<bool> promise;
workers.add_timed_task (std::chrono::steady_clock::now (), [&]() {
nano::lock_guard<std::mutex> lock (mutex);
value1 = 1;
value2 = 1;
});
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (1), [&]() {
nano::lock_guard<std::mutex> lock (mutex);
value2 = 2;
promise.set_value (false);
});
promise.get_future ().get ();
nano::lock_guard<std::mutex> lock (mutex);
ASSERT_EQ (1, value1);
ASSERT_EQ (2, value2);
}
TEST (filesystem, remove_all_files)
{
auto path = nano::unique_path ();

View file

@ -158,7 +158,9 @@ TEST (vote_spacing, vote_generator)
nano::node_config config;
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
nano::system system;
auto & node = *system.add_node (config);
nano::node_flags node_flags;
node_flags.disable_search_pending = true;
auto & node = *system.add_node (config, node_flags);
auto & wallet = *system.wallet (0);
wallet.insert_adhoc (nano::dev_genesis_key.prv);
nano::state_block_builder builder;
@ -199,7 +201,9 @@ TEST (vote_spacing, rapid)
nano::node_config config;
config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
nano::system system;
auto & node = *system.add_node (config);
nano::node_flags node_flags;
node_flags.disable_search_pending = true;
auto & node = *system.add_node (config, node_flags);
auto & wallet = *system.wallet (0);
wallet.insert_adhoc (nano::dev_genesis_key.prv);
nano::state_block_builder builder;

View file

@ -132,11 +132,14 @@ TEST (wallets, vote_minimum)
nano::state_block open2 (key2.pub, 0, key2.pub, node1.config.vote_minimum.number () - 1, send2.hash (), key2.prv, key2.pub, *system.work.generate (key2.pub));
ASSERT_EQ (nano::process_result::progress, node1.process (open2).code);
auto wallet (node1.wallets.items.begin ()->second);
nano::unique_lock<std::mutex> representatives_lk (wallet->representatives_mutex);
ASSERT_EQ (0, wallet->representatives.size ());
representatives_lk.unlock ();
wallet->insert_adhoc (nano::dev_genesis_key.prv);
wallet->insert_adhoc (key1.prv);
wallet->insert_adhoc (key2.prv);
node1.wallets.compute_reps ();
representatives_lk.lock ();
ASSERT_EQ (2, wallet->representatives.size ());
}

View file

@ -20,8 +20,6 @@ endif()
add_library(
nano_lib
${platform_sources}
alarm.hpp
alarm.cpp
asio.hpp
asio.cpp
blockbuilders.hpp
@ -79,9 +77,7 @@ add_library(
walletconfig.hpp
walletconfig.cpp
work.hpp
work.cpp
worker.hpp
worker.cpp)
work.cpp)
target_link_libraries(
nano_lib

View file

@ -1,78 +0,0 @@
#include <nano/lib/alarm.hpp>
#include <nano/lib/threading.hpp>
bool nano::operation::operator> (nano::operation const & other_a) const
{
return wakeup > other_a.wakeup;
}
nano::alarm::alarm (boost::asio::io_context & io_ctx_a) :
io_ctx (io_ctx_a),
thread ([this]() {
nano::thread_role::set (nano::thread_role::name::alarm);
run ();
})
{
}
nano::alarm::~alarm ()
{
add (std::chrono::steady_clock::now (), nullptr);
thread.join ();
}
void nano::alarm::run ()
{
nano::unique_lock<std::mutex> lock (mutex);
auto done (false);
while (!done)
{
if (!operations.empty ())
{
auto & operation (operations.top ());
if (operation.function)
{
if (operation.wakeup <= std::chrono::steady_clock::now ())
{
io_ctx.post (operation.function);
operations.pop ();
}
else
{
auto wakeup (operation.wakeup);
condition.wait_until (lock, wakeup);
}
}
else
{
done = true;
}
}
else
{
condition.wait (lock);
}
}
}
void nano::alarm::add (std::chrono::steady_clock::time_point const & wakeup_a, std::function<void()> const & operation)
{
{
nano::lock_guard<std::mutex> guard (mutex);
operations.push (nano::operation ({ wakeup_a, operation }));
}
condition.notify_all ();
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (alarm & alarm, const std::string & name)
{
size_t count;
{
nano::lock_guard<std::mutex> guard (alarm.mutex);
count = alarm.operations.size ();
}
auto sizeof_element = sizeof (decltype (alarm.operations)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "operations", count, sizeof_element }));
return composite;
}

View file

@ -1,47 +0,0 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/lib/utility.hpp>
#include <chrono>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
namespace boost
{
namespace asio
{
class io_context;
}
}
namespace nano
{
/** An alarm operation is a function- and invocation time pair. Operations are ordered chronologically. */
class operation final
{
public:
bool operator> (nano::operation const &) const;
std::chrono::steady_clock::time_point wakeup;
std::function<void()> function;
};
/** General purpose timer to defer operations */
class alarm final
{
public:
explicit alarm (boost::asio::io_context &);
~alarm ();
void add (std::chrono::steady_clock::time_point const &, std::function<void()> const &);
void run ();
boost::asio::io_context & io_ctx;
std::mutex mutex;
nano::condition_variable condition;
std::priority_queue<operation, std::vector<operation>, std::greater<operation>> operations;
std::thread thread;
};
class container_info_component;
std::unique_ptr<container_info_component> collect_container_info (alarm & alarm, const std::string & name);
}

View file

@ -1,8 +1,10 @@
#include <nano/boost/asio/post.hpp>
#include <nano/lib/config.hpp>
#include <nano/lib/threading.hpp>
#include <boost/format.hpp>
#include <future>
#include <iostream>
#include <thread>
@ -34,9 +36,6 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::packet_processing:
thread_role_name_string = "Pkt processing";
break;
case nano::thread_role::name::alarm:
thread_role_name_string = "Alarm";
break;
case nano::thread_role::name::vote_processing:
thread_role_name_string = "Vote processing";
break;
@ -196,3 +195,87 @@ void nano::thread_runner::stop_event_processing ()
{
io_guard.get_executor ().context ().stop ();
}
nano::thread_pool::thread_pool (unsigned num_threads, nano::thread_role::name thread_name) :
num_threads (num_threads),
thread_pool_m (std::make_unique<boost::asio::thread_pool> (num_threads))
{
set_thread_names (num_threads, thread_name);
}
nano::thread_pool::~thread_pool ()
{
stop ();
}
void nano::thread_pool::stop ()
{
nano::unique_lock<std::mutex> lk (mutex);
if (!stopped)
{
stopped = true;
#if defined(BOOST_ASIO_HAS_IOCP)
// A hack needed for Windows to prevent deadlock during destruction, described here: https://github.com/chriskohlhoff/asio/issues/431
boost::asio::use_service<boost::asio::detail::win_iocp_io_context> (*thread_pool_m).stop ();
#endif
lk.unlock ();
thread_pool_m->stop ();
thread_pool_m->join ();
lk.lock ();
thread_pool_m = nullptr;
}
}
void nano::thread_pool::push_task (std::function<void()> task)
{
nano::lock_guard<std::mutex> guard (mutex);
if (!stopped)
{
boost::asio::post (*thread_pool_m, task);
}
}
void nano::thread_pool::add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function<void()> task)
{
nano::lock_guard<std::mutex> guard (mutex);
if (!stopped && thread_pool_m)
{
auto timer = std::make_shared<boost::asio::steady_timer> (thread_pool_m->get_executor (), expiry_time);
timer->async_wait ([this, task, timer](const boost::system::error_code & ec) {
if (!ec)
{
push_task (task);
}
});
}
}
unsigned nano::thread_pool::get_num_threads () const
{
return num_threads;
}
// Set the names of all the threads in the thread pool for easier identification
void nano::thread_pool::set_thread_names (unsigned num_threads, nano::thread_role::name thread_name)
{
std::vector<std::promise<void>> promises (num_threads);
std::vector<std::future<void>> futures;
futures.reserve (num_threads);
std::transform (promises.begin (), promises.end (), std::back_inserter (futures), [](auto & promise) {
return promise.get_future ();
});
for (auto i = 0u; i < num_threads; ++i)
{
boost::asio::post (*thread_pool_m, [& promise = promises[i], thread_name]() {
nano::thread_role::set (thread_name);
promise.set_value ();
});
}
// Wait until all threads have finished
for (auto & future : futures)
{
future.wait ();
}
}

View file

@ -1,7 +1,10 @@
#pragma once
#include <nano/boost/asio/deadline_timer.hpp>
#include <nano/boost/asio/executor_work_guard.hpp>
#include <nano/boost/asio/io_context.hpp>
#include <nano/boost/asio/steady_timer.hpp>
#include <nano/boost/asio/thread_pool.hpp>
#include <nano/lib/utility.hpp>
#include <boost/thread/thread.hpp>
@ -19,7 +22,6 @@ namespace thread_role
io,
work,
packet_processing,
alarm,
vote_processing,
block_processing,
request_loop,
@ -79,8 +81,8 @@ public:
};
/* Default memory order of normal std::atomic operations is std::memory_order_seq_cst which provides
a total global ordering of atomic operations are well as synchronization between threads. Weaker memory
ordering can provide benefits in some circumstances, such like in dumb counters where no other data is
a total global ordering of atomic operations as well as synchronization between threads. Weaker memory
ordering can provide benefits in some circumstances, like dumb counters where no other data is
dependent on the ordering of these operations. This assumes T is a type of integer, not bool or char. */
template <typename T, typename = std::enable_if_t<std::is_integral<T>::value>>
class relaxed_atomic_integral
@ -159,4 +161,31 @@ public:
private:
std::atomic<T> atomic;
};
class thread_pool final
{
public:
explicit thread_pool (unsigned, nano::thread_role::name);
~thread_pool ();
/** This will run when there is an available thread for execution */
void push_task (std::function<void()>);
/** Run a task at a certain point in time */
void add_timed_task (std::chrono::steady_clock::time_point const & expiry_time, std::function<void()> task);
/** Stops any further pushed tasks from executing */
void stop ();
/** Number of threads in the thread pool */
unsigned get_num_threads () const;
private:
std::mutex mutex;
std::atomic<bool> stopped{ false };
unsigned num_threads;
std::unique_ptr<boost::asio::thread_pool> thread_pool_m;
void set_thread_names (unsigned num_threads, nano::thread_role::name thread_name);
};
}

View file

@ -1,78 +0,0 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/worker.hpp>
nano::worker::worker () :
thread ([this]() {
nano::thread_role::set (nano::thread_role::name::worker);
this->run ();
})
{
}
void nano::worker::run ()
{
nano::unique_lock<std::mutex> lk (mutex);
while (!stopped)
{
if (!queue.empty ())
{
auto func = queue.front ();
queue.pop_front ();
lk.unlock ();
func ();
// So that we reduce locking for anything being pushed as that will
// most likely be on an io-thread
std::this_thread::yield ();
lk.lock ();
}
else
{
cv.wait (lk);
}
}
}
nano::worker::~worker ()
{
stop ();
}
void nano::worker::push_task (std::function<void()> func_a)
{
{
nano::lock_guard<std::mutex> guard (mutex);
if (!stopped)
{
queue.emplace_back (func_a);
}
}
cv.notify_one ();
}
void nano::worker::stop ()
{
{
nano::unique_lock<std::mutex> lk (mutex);
stopped = true;
queue.clear ();
}
cv.notify_one ();
if (thread.joinable ())
{
thread.join ();
}
}
std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::worker & worker, const std::string & name)
{
size_t count;
{
nano::lock_guard<std::mutex> guard (worker.mutex);
count = worker.queue.size ();
}
auto sizeof_element = sizeof (decltype (worker.queue)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<nano::container_info_leaf> (nano::container_info{ "queue", count, sizeof_element }));
return composite;
}

View file

@ -1,33 +0,0 @@
#pragma once
#include <nano/lib/locks.hpp>
#include <nano/lib/utility.hpp>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
namespace nano
{
class worker final
{
public:
worker ();
~worker ();
void run ();
void push_task (std::function<void()> func);
void stop ();
private:
nano::condition_variable cv;
std::deque<std::function<void()>> queue;
std::mutex mutex;
bool stopped{ false };
std::thread thread;
friend std::unique_ptr<container_info_component> collect_container_info (worker &, const std::string &);
};
std::unique_ptr<container_info_component> collect_container_info (worker & worker, const std::string & name);
}

View file

@ -56,7 +56,6 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano::
return opencl->generate_work (version_a, root_a, difficulty_a, ticket_a);
}
: std::function<boost::optional<uint64_t> (nano::work_version const, nano::root const &, uint64_t, std::atomic<int> &)> (nullptr));
nano::alarm alarm (io_ctx);
try
{
// This avoid a blank prompt during any node initialization delays
@ -64,7 +63,7 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano::
std::cout << initialization_text << std::endl;
logger.always_log (initialization_text);
auto node (std::make_shared<nano::node> (io_ctx, data_path, alarm, config.node, opencl_work, flags));
auto node (std::make_shared<nano::node> (io_ctx, data_path, config.node, opencl_work, flags));
if (!node->init_error ())
{
auto network_label = node->network_params.network.get_current_network_as_string ();
@ -107,9 +106,9 @@ void nano_daemon::daemon::run (boost::filesystem::path const & data_path, nano::
std::cout << error.get_message () << std::endl;
std::exit (1);
}
rpc_handler = std::make_unique<nano::inprocess_rpc_handler> (*node, ipc_server, config.rpc, [&ipc_server, &alarm, &io_ctx]() {
rpc_handler = std::make_unique<nano::inprocess_rpc_handler> (*node, ipc_server, config.rpc, [&ipc_server, &workers = node->workers, &io_ctx]() {
ipc_server.stop ();
alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (3), [&io_ctx]() {
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [&io_ctx]() {
io_ctx.stop ();
});
});

View file

@ -907,14 +907,13 @@ int main (int argc, char * const * argv)
size_t max_blocks (2 * num_accounts * num_iterations + num_accounts * 2); // 1,000,000 + 2 * 100,000 = 1,200,000 blocks
std::cout << boost::str (boost::format ("Starting pregenerating %1% blocks\n") % max_blocks);
boost::asio::io_context io_ctx;
nano::alarm alarm (io_ctx);
nano::work_pool work (std::numeric_limits<unsigned>::max ());
nano::logging logging;
auto path (nano::unique_path ());
logging.init (path);
nano::node_flags node_flags;
nano::update_flags (node_flags, vm);
auto node (std::make_shared<nano::node> (io_ctx, 24001, path, alarm, logging, work, node_flags));
auto node (std::make_shared<nano::node> (io_ctx, 24001, path, logging, work, node_flags));
nano::block_hash genesis_latest (node->latest (dev_params.ledger.dev_genesis_key.pub));
nano::uint128_t genesis_balance (std::numeric_limits<nano::uint128_t>::max ());
// Generating keys
@ -1028,12 +1027,11 @@ int main (int argc, char * const * argv)
size_t max_votes (num_elections * num_representatives); // 40,000 * 25 = 1,000,000 votes
std::cerr << boost::str (boost::format ("Starting pregenerating %1% votes\n") % max_votes);
boost::asio::io_context io_ctx;
nano::alarm alarm (io_ctx);
nano::work_pool work (std::numeric_limits<unsigned>::max ());
nano::logging logging;
auto path (nano::unique_path ());
logging.init (path);
auto node (std::make_shared<nano::node> (io_ctx, 24001, path, alarm, logging, work));
auto node (std::make_shared<nano::node> (io_ctx, 24001, path, logging, work));
nano::block_hash genesis_latest (node->latest (dev_params.ledger.dev_genesis_key.pub));
nano::uint128_t genesis_balance (std::numeric_limits<nano::uint128_t>::max ());
// Generating keys
@ -1150,8 +1148,6 @@ int main (int argc, char * const * argv)
std::cout << boost::str (boost::format ("Starting generating %1% blocks...\n") % (count * 2));
boost::asio::io_context io_ctx1;
boost::asio::io_context io_ctx2;
nano::alarm alarm1 (io_ctx1);
nano::alarm alarm2 (io_ctx2);
nano::work_pool work (std::numeric_limits<unsigned>::max ());
nano::logging logging;
auto path1 (nano::unique_path ());
@ -1163,7 +1159,7 @@ int main (int argc, char * const * argv)
flags.disable_legacy_bootstrap = true;
flags.disable_wallet_bootstrap = true;
flags.disable_bootstrap_listener = true;
auto node1 (std::make_shared<nano::node> (io_ctx1, path1, alarm1, config1, work, flags, 0));
auto node1 (std::make_shared<nano::node> (io_ctx1, path1, config1, work, flags, 0));
nano::block_hash genesis_latest (node1->latest (dev_params.ledger.dev_genesis_key.pub));
nano::uint128_t genesis_balance (std::numeric_limits<nano::uint128_t>::max ());
// Generating blocks
@ -1260,7 +1256,7 @@ int main (int argc, char * const * argv)
config2.active_elections_size = daemon_config.node.active_elections_size;
}
}
auto node2 (std::make_shared<nano::node> (io_ctx2, path2, alarm2, config2, work, flags, 1));
auto node2 (std::make_shared<nano::node> (io_ctx2, path2, config2, work, flags, 1));
node2->start ();
nano::thread_runner runner2 (io_ctx2, node2->config.io_threads);
std::cout << boost::str (boost::format ("Processing %1% blocks (test node)\n") % (count * 2));

View file

@ -111,8 +111,7 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost
return opencl->generate_work (version_a, root_a, difficulty_a);
}
: std::function<boost::optional<uint64_t> (nano::work_version const, nano::root const &, uint64_t, std::atomic<int> &)> (nullptr));
nano::alarm alarm (io_ctx);
node = std::make_shared<nano::node> (io_ctx, data_path, alarm, config.node, work, flags);
node = std::make_shared<nano::node> (io_ctx, data_path, config.node, work, flags);
if (!node->init_error ())
{
auto wallet (node->wallets.open (wallet_config.wallet));

View file

@ -354,7 +354,7 @@ void nano::bootstrap_attempt_legacy::attempt_restart_check (nano::unique_lock<st
node->background ([node_l, this_l]() {
node_l->bootstrap_initiator.remove_attempt (this_l);
// Delay after removing current attempt
node_l->alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (50), [node_l]() {
node_l->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (50), [node_l]() {
node_l->bootstrap_initiator.bootstrap (true);
});
});

View file

@ -109,7 +109,7 @@ void nano::bulk_pull_client::throttled_receive_block ()
else
{
auto this_l (shared_from_this ());
connection->node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l]() {
connection->node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l]() {
if (!this_l->connection->pending_stop && !this_l->attempt->stopped)
{
this_l->throttled_receive_block ();

View file

@ -127,7 +127,7 @@ void nano::bulk_push_server::throttled_receive ()
else
{
auto this_l (shared_from_this ());
connection->node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l]() {
connection->node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l]() {
if (!this_l->connection->stopped)
{
this_l->throttled_receive ();

View file

@ -304,7 +304,7 @@ void nano::bootstrap_connections::populate_connections (bool repeat)
if (!stopped && repeat)
{
std::weak_ptr<nano::bootstrap_connections> this_w (shared_from_this ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_w]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->populate_connections ();

View file

@ -567,7 +567,7 @@ void nano::bootstrap_server::finish_request ()
else
{
std::weak_ptr<nano::bootstrap_server> this_w (shared_from_this ());
node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() {
node->workers.add_timed_task (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->timeout ();
@ -748,7 +748,7 @@ void nano::bootstrap_server::run_next (nano::unique_lock<std::mutex> & lock_a)
if (timeout_check)
{
std::weak_ptr<nano::bootstrap_server> this_w (shared_from_this ());
node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() {
node->workers.add_timed_task (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->timeout ();

View file

@ -368,7 +368,7 @@ void nano::distributed_work::handle_failure ()
auto now (std::chrono::steady_clock::now ());
std::weak_ptr<nano::node> node_w (node.shared ());
auto next_backoff (std::min (backoff * 2, std::chrono::seconds (5 * 60)));
node.alarm.add (now + std::chrono::seconds (backoff), [node_w, request_l = request, next_backoff] {
node.workers.add_timed_task (now + std::chrono::seconds (backoff), [node_w, request_l = request, next_backoff] {
bool error_l{ true };
if (auto node_l = node_w.lock ())
{

View file

@ -96,7 +96,7 @@ bool nano::gap_cache::bootstrap_check (std::vector<nano::account> const & voters
void nano::gap_cache::bootstrap_start (nano::block_hash const & hash_a)
{
auto node_l (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + node.network_params.bootstrap.gap_cache_bootstrap_start_interval, [node_l, hash_a]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.bootstrap.gap_cache_bootstrap_start_interval, [node_l, hash_a]() {
if (!node_l->ledger.block_or_pruned_exists (hash_a))
{
if (!node_l->bootstrap_initiator.in_progress ())

View file

@ -289,7 +289,7 @@ public:
// Note that if the rpc action is async, the shared_ptr<json_handler> lifetime will be extended by the action handler
auto handler (std::make_shared<nano::json_handler> (node, server.node_rpc_config, body, response_handler_l, [& server = server]() {
server.stop ();
server.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (3), [& io_ctx = server.node.alarm.io_ctx]() {
server.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [& io_ctx = server.node.io_ctx]() {
io_ctx.stop ();
});
}));

View file

@ -539,7 +539,7 @@ void nano::json_handler::account_block_count ()
void nano::json_handler::account_create ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -726,7 +726,7 @@ void nano::json_handler::account_list ()
void nano::json_handler::account_move ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -765,7 +765,7 @@ void nano::json_handler::account_move ()
void nano::json_handler::account_remove ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
auto account (rpc_l->account_impl ());
if (!rpc_l->ec)
@ -800,7 +800,7 @@ void nano::json_handler::account_representative ()
void nano::json_handler::account_representative_set ()
{
node.worker.push_task (create_worker_task ([work_generation_enabled = node.work_generation_enabled ()](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([work_generation_enabled = node.work_generation_enabled ()](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
auto account (rpc_l->account_impl ());
std::string representative_text (rpc_l->request.get<std::string> ("representative"));
@ -896,7 +896,7 @@ void nano::json_handler::accounts_balances ()
void nano::json_handler::accounts_create ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
auto count (rpc_l->count_impl ());
if (!rpc_l->ec)
@ -2755,7 +2755,7 @@ void nano::json_handler::node_id_delete ()
void nano::json_handler::password_change ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -2778,7 +2778,7 @@ void nano::json_handler::password_change ()
void nano::json_handler::password_enter ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -2981,7 +2981,7 @@ void nano::json_handler::pending_exists ()
void nano::json_handler::process ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
const bool watch_work_l = rpc_l->request.get<bool> ("watch_work", true);
const bool is_async = rpc_l->request.get<bool> ("async", false);
auto block (rpc_l->block_impl (true));
@ -3944,7 +3944,7 @@ void nano::json_handler::unchecked ()
void nano::json_handler::unchecked_clear ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto transaction (rpc_l->node.store.tx_begin_write ({ tables::unchecked }));
rpc_l->node.store.unchecked_clear (transaction);
rpc_l->response_l.put ("success", "");
@ -4123,7 +4123,7 @@ void nano::json_handler::validate_account_number ()
void nano::json_handler::wallet_add ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -4153,7 +4153,7 @@ void nano::json_handler::wallet_add ()
void nano::json_handler::wallet_add_watch ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -4253,7 +4253,7 @@ void nano::json_handler::wallet_balances ()
void nano::json_handler::wallet_change_seed ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
if (!rpc_l->ec)
{
@ -4301,7 +4301,7 @@ void nano::json_handler::wallet_contains ()
void nano::json_handler::wallet_create ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
nano::raw_key seed;
auto seed_text (rpc_l->request.get_optional<std::string> ("seed"));
if (seed_text.is_initialized () && seed.data.decode_hex (seed_text.get ()))
@ -4337,7 +4337,7 @@ void nano::json_handler::wallet_create ()
void nano::json_handler::wallet_destroy ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
std::string wallet_text (rpc_l->request.get<std::string> ("wallet"));
nano::wallet_id wallet;
if (!wallet.decode_hex (wallet_text))
@ -4620,7 +4620,7 @@ void nano::json_handler::wallet_representative ()
void nano::json_handler::wallet_representative_set ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
std::string representative_text (rpc_l->request.get<std::string> ("representative"));
auto representative (rpc_l->account_impl (representative_text, nano::error_rpc::bad_representative_number));
@ -4907,7 +4907,7 @@ void nano::json_handler::work_get ()
void nano::json_handler::work_set ()
{
node.worker.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
node.workers.push_task (create_worker_task ([](std::shared_ptr<nano::json_handler> const & rpc_l) {
auto wallet (rpc_l->wallet_impl ());
auto account (rpc_l->account_impl ());
auto work (rpc_l->work_optional_impl ());

View file

@ -227,7 +227,7 @@ void nano::network::flood_block_many (std::deque<std::shared_ptr<nano::block>> b
if (!blocks_a.empty ())
{
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks (std::move (blocks_a)), callback_a, delay_a]() {
if (auto node_l = node_w.lock ())
{
node_l->network.flood_block_many (std::move (blocks), callback_a, delay_a);
@ -296,7 +296,7 @@ void nano::network::broadcast_confirm_req_base (std::shared_ptr<nano::block> con
delay_a += std::rand () % broadcast_interval_ms;
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, block_a, endpoints_a, delay_a]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, block_a, endpoints_a, delay_a]() {
if (auto node_l = node_w.lock ())
{
node_l->network.broadcast_confirm_req_base (block_a, endpoints_a, delay_a, true);
@ -336,7 +336,7 @@ void nano::network::broadcast_confirm_req_batched_many (std::unordered_map<std::
if (!request_bundle_a.empty ())
{
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, request_bundle_a, callback_a, delay_a]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, request_bundle_a, callback_a, delay_a]() {
if (auto node_l = node_w.lock ())
{
node_l->network.broadcast_confirm_req_batched_many (request_bundle_a, callback_a, delay_a, true);
@ -365,7 +365,7 @@ void nano::network::broadcast_confirm_req_many (std::deque<std::pair<std::shared
if (!requests_a.empty ())
{
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, requests_a, callback_a, delay_a]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, requests_a, callback_a, delay_a]() {
if (auto node_l = node_w.lock ())
{
node_l->network.broadcast_confirm_req_many (requests_a, callback_a, delay_a);

View file

@ -79,19 +79,19 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (re
return composite;
}
nano::node::node (boost::asio::io_context & io_ctx_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, nano::alarm & alarm_a, nano::logging const & logging_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
node (io_ctx_a, application_path_a, alarm_a, nano::node_config (peering_port_a, logging_a), work_a, flags_a, seq)
nano::node::node (boost::asio::io_context & io_ctx_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, nano::logging const & logging_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
node (io_ctx_a, application_path_a, nano::node_config (peering_port_a, logging_a), work_a, flags_a, seq)
{
}
nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path const & application_path_a, nano::alarm & alarm_a, nano::node_config const & config_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path const & application_path_a, nano::node_config const & config_a, nano::work_pool & work_a, nano::node_flags flags_a, unsigned seq) :
write_database_queue (!flags_a.force_use_write_database_queue && (config_a.rocksdb_config.enable || nano::using_rocksdb_in_tests ())),
io_ctx (io_ctx_a),
node_initialized_latch (1),
config (config_a),
stats (config.stat_config),
workers (std::max (3u, config.io_threads / 2), nano::thread_role::name::worker),
flags (flags_a),
alarm (alarm_a),
work (work_a),
distributed_work (*this),
logger (config_a.logging.min_time_between_log_output),
@ -103,7 +103,7 @@ gap_cache (*this),
ledger (store, stats, flags_a.generate_cache, [this]() { this->network.erase_below_version (network_params.protocol.protocol_version_min (true)); }),
checker (config.signature_checker_threads),
network (*this, config.peering_port),
telemetry (std::make_shared<nano::telemetry> (network, alarm, worker, observers.telemetry, stats, network_params, flags.disable_ongoing_telemetry_requests)),
telemetry (std::make_shared<nano::telemetry> (network, workers, observers.telemetry, stats, network_params, flags.disable_ongoing_telemetry_requests)),
bootstrap_initiator (*this),
bootstrap (config.peering_port, *this),
application_path (application_path_a),
@ -577,7 +577,6 @@ void nano::node::process_fork (nano::transaction const & transaction_a, std::sha
std::unique_ptr<nano::container_info_component> nano::collect_container_info (node & node, const std::string & name)
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (collect_container_info (node.alarm, "alarm"));
composite->add_component (collect_container_info (node.work, "work"));
composite->add_component (collect_container_info (node.gap_cache, "gap_cache"));
composite->add_component (collect_container_info (node.ledger, "ledger"));
@ -600,7 +599,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.block_uniquer, "block_uniquer"));
composite->add_component (collect_container_info (node.vote_uniquer, "vote_uniquer"));
composite->add_component (collect_container_info (node.confirmation_height_processor, "confirmation_height_processor"));
composite->add_component (collect_container_info (node.worker, "worker"));
composite->add_component (collect_container_info (node.distributed_work, "distributed_work"));
composite->add_component (collect_container_info (node.aggregator, "request_aggregator"));
return composite;
@ -654,14 +652,14 @@ void nano::node::start ()
if (!flags.disable_unchecked_cleanup)
{
auto this_l (shared ());
worker.push_task ([this_l]() {
workers.push_task ([this_l]() {
this_l->ongoing_unchecked_cleanup ();
});
}
if (flags.enable_pruning)
{
auto this_l (shared ());
worker.push_task ([this_l]() {
workers.push_task ([this_l]() {
this_l->ongoing_ledger_pruning ();
});
}
@ -690,7 +688,7 @@ void nano::node::start ()
{
// Delay to start wallet lazy bootstrap
auto this_l (shared ());
alarm.add (std::chrono::steady_clock::now () + std::chrono::minutes (1), [this_l]() {
workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::minutes (1), [this_l]() {
this_l->bootstrap_wallet ();
});
}
@ -730,12 +728,12 @@ void nano::node::stop ()
checker.stop ();
wallets.stop ();
stats.stop ();
worker.stop ();
auto epoch_upgrade = epoch_upgrading.lock ();
if (epoch_upgrade->valid ())
{
epoch_upgrade->wait ();
}
workers.stop ();
// work pool is not stopped on purpose due to testing setup
}
}
@ -832,7 +830,7 @@ void nano::node::ongoing_rep_calculation ()
auto now (std::chrono::steady_clock::now ());
vote_processor.calculate_weights ();
std::weak_ptr<nano::node> node_w (shared_from_this ());
alarm.add (now + std::chrono::minutes (10), [node_w]() {
workers.add_timed_task (now + std::chrono::minutes (10), [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_rep_calculation ();
@ -854,7 +852,7 @@ void nano::node::ongoing_bootstrap ()
}
bootstrap_initiator.bootstrap ();
std::weak_ptr<nano::node> node_w (shared_from_this ());
alarm.add (std::chrono::steady_clock::now () + next_wakeup, [node_w]() {
workers.add_timed_task (std::chrono::steady_clock::now () + next_wakeup, [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->ongoing_bootstrap ();
@ -867,12 +865,10 @@ void nano::node::ongoing_peer_store ()
bool stored (network.tcp_channels.store_all (true));
network.udp_channels.store_all (!stored);
std::weak_ptr<nano::node> node_w (shared_from_this ());
alarm.add (std::chrono::steady_clock::now () + network_params.node.peer_interval, [node_w]() {
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.peer_interval, [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->worker.push_task ([node_l]() {
node_l->ongoing_peer_store ();
});
node_l->ongoing_peer_store ();
}
});
}
@ -890,7 +886,7 @@ void nano::node::backup_wallet ()
i->second->store.write_backup (transaction, backup_path / (i->first.to_string () + ".json"));
}
auto this_l (shared ());
alarm.add (std::chrono::steady_clock::now () + network_params.node.backup_interval, [this_l]() {
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.backup_interval, [this_l]() {
this_l->backup_wallet ();
});
}
@ -902,10 +898,8 @@ void nano::node::search_pending ()
// Search pending
wallets.search_pending_all ();
auto this_l (shared ());
alarm.add (std::chrono::steady_clock::now () + network_params.node.search_pending_interval, [this_l]() {
this_l->worker.push_task ([this_l]() {
this_l->search_pending ();
});
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.search_pending_interval, [this_l]() {
this_l->search_pending ();
});
}
@ -982,10 +976,8 @@ void nano::node::ongoing_unchecked_cleanup ()
{
unchecked_cleanup ();
auto this_l (shared ());
alarm.add (std::chrono::steady_clock::now () + network_params.node.unchecked_cleaning_interval, [this_l]() {
this_l->worker.push_task ([this_l]() {
this_l->ongoing_unchecked_cleanup ();
});
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.unchecked_cleaning_interval, [this_l]() {
this_l->ongoing_unchecked_cleanup ();
});
}
@ -1100,8 +1092,8 @@ void nano::node::ongoing_ledger_pruning ()
ledger_pruning (flags.block_processor_batch_size != 0 ? flags.block_processor_batch_size : 2 * 1024, bootstrap_weight_reached, false);
auto ledger_pruning_interval (bootstrap_weight_reached ? config.max_pruning_age : std::min (config.max_pruning_age, std::chrono::seconds (15 * 60)));
auto this_l (shared ());
alarm.add (std::chrono::steady_clock::now () + ledger_pruning_interval, [this_l]() {
this_l->worker.push_task ([this_l]() {
workers.add_timed_task (std::chrono::steady_clock::now () + ledger_pruning_interval, [this_l]() {
this_l->workers.push_task ([this_l]() {
this_l->ongoing_ledger_pruning ();
});
});
@ -1266,12 +1258,10 @@ bool nano::node::block_confirmed_or_being_confirmed (nano::transaction const & t
void nano::node::ongoing_online_weight_calculation_queue ()
{
std::weak_ptr<nano::node> node_w (shared_from_this ());
alarm.add (std::chrono::steady_clock::now () + (std::chrono::seconds (network_params.node.weight_period)), [node_w]() {
workers.add_timed_task (std::chrono::steady_clock::now () + (std::chrono::seconds (network_params.node.weight_period)), [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->worker.push_task ([node_l]() {
node_l->ongoing_online_weight_calculation ();
});
node_l->ongoing_online_weight_calculation ();
}
});
}
@ -1377,7 +1367,7 @@ void nano::node::process_confirmed (nano::election_status const & status_a, uint
{
iteration_a++;
std::weak_ptr<nano::node> node_w (shared ());
alarm.add (std::chrono::steady_clock::now () + network_params.node.process_confirmed_interval, [node_w, status_a, iteration_a]() {
workers.add_timed_task (std::chrono::steady_clock::now () + network_params.node.process_confirmed_interval, [node_w, status_a, iteration_a]() {
if (auto node_l = node_w.lock ())
{
node_l->process_confirmed (status_a, iteration_a);
@ -1570,7 +1560,7 @@ void nano::node::epoch_upgrader_impl (nano::private_key const & prv_a, nano::epo
upgrader_condition.wait (lock);
}
}
worker.push_task ([node_l = shared_from_this (), &upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account]() {
this->workers.push_task ([node_l = shared_from_this (), &upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_accounts, &workers, epoch, difficulty, signer, root, account]() {
upgrader_process (*node_l, upgraded_accounts, epoch, difficulty, signer, root, account);
{
nano::lock_guard<std::mutex> lock (upgrader_mutex);
@ -1649,7 +1639,7 @@ void nano::node::epoch_upgrader_impl (nano::private_key const & prv_a, nano::epo
upgrader_condition.wait (lock);
}
}
worker.push_task ([node_l = shared_from_this (), &upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account]() {
this->workers.push_task ([node_l = shared_from_this (), &upgrader_process, &upgrader_mutex, &upgrader_condition, &upgraded_pending, &workers, epoch, difficulty, signer, root, account]() {
upgrader_process (*node_l, upgraded_pending, epoch, difficulty, signer, root, account);
{
nano::lock_guard<std::mutex> lock (upgrader_mutex);
@ -1746,7 +1736,6 @@ std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> nano::node::get_
nano::inactive_node::inactive_node (boost::filesystem::path const & path_a, nano::node_flags const & node_flags_a) :
io_context (std::make_shared<boost::asio::io_context> ()),
alarm (*io_context),
work (1)
{
boost::system::error_code error_chmod;
@ -1775,7 +1764,7 @@ work (1)
node_config.logging.max_size = std::numeric_limits<std::uintmax_t>::max ();
node_config.logging.init (path_a);
node = std::make_shared<nano::node> (*io_context, path_a, alarm, node_config, work, node_flags_a);
node = std::make_shared<nano::node> (*io_context, path_a, node_config, work, node_flags_a);
node->active.stop ();
}

View file

@ -1,11 +1,9 @@
#pragma once
#include <nano/boost/asio/spawn.hpp>
#include <nano/lib/alarm.hpp>
#include <nano/lib/config.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/work.hpp>
#include <nano/lib/worker.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
@ -88,13 +86,13 @@ std::unique_ptr<container_info_component> collect_container_info (rep_crawler &
class node final : public std::enable_shared_from_this<nano::node>
{
public:
node (boost::asio::io_context &, uint16_t, boost::filesystem::path const &, nano::alarm &, nano::logging const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
node (boost::asio::io_context &, boost::filesystem::path const &, nano::alarm &, nano::node_config const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
node (boost::asio::io_context &, uint16_t, boost::filesystem::path const &, nano::logging const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
node (boost::asio::io_context &, boost::filesystem::path const &, nano::node_config const &, nano::work_pool &, nano::node_flags = nano::node_flags (), unsigned seq = 0);
~node ();
template <typename T>
void background (T action_a)
{
alarm.io_ctx.post (action_a);
io_ctx.post (action_a);
}
template <typename... Params>
void spawn (Params... args)
@ -158,16 +156,15 @@ public:
bool init_error () const;
bool epoch_upgrader (nano::private_key const &, nano::epoch, uint64_t, uint64_t);
std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> get_bootstrap_weights () const;
nano::worker worker;
nano::write_database_queue write_database_queue;
boost::asio::io_context & io_ctx;
boost::latch node_initialized_latch;
nano::network_params network_params;
nano::node_config config;
nano::stat stats;
nano::thread_pool workers;
std::shared_ptr<nano::websocket::listener> websocket_server;
nano::node_flags flags;
nano::alarm & alarm;
nano::work_pool & work;
nano::distributed_work_factory distributed_work;
nano::logger_mt logger;
@ -231,7 +228,6 @@ public:
inactive_node (boost::filesystem::path const & path_a, nano::node_flags const & node_flags_a);
~inactive_node ();
std::shared_ptr<boost::asio::io_context> io_context;
nano::alarm alarm;
nano::work_pool work;
std::shared_ptr<nano::node> node;
};

View file

@ -96,7 +96,7 @@ void nano::port_mapping::refresh_mapping ()
node.logger.always_log (boost::str (boost::format ("UPnP %1%:%2% mapped to %3%") % protocol.external_address % config_port_l % node_port_l));
// Refresh mapping before the leasing ends
node.alarm.add (std::chrono::steady_clock::now () + network_params.portmapping.lease_duration - std::chrono::seconds (10), [node_l = node.shared ()]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + network_params.portmapping.lease_duration - std::chrono::seconds (10), [node_l = node.shared ()]() {
node_l->port_mapping.refresh_mapping ();
});
}
@ -166,7 +166,7 @@ void nano::port_mapping::check_mapping_loop ()
refresh_mapping ();
}
// Check for mapping health frequently
node.alarm.add (std::chrono::steady_clock::now () + network_params.portmapping.health_check_period, [node_l = node.shared ()]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + network_params.portmapping.health_check_period, [node_l = node.shared ()]() {
node_l->port_mapping.check_mapping_loop ();
});
}
@ -177,7 +177,7 @@ void nano::port_mapping::check_mapping_loop ()
node.logger.always_log (boost::str (boost::format ("UPnP No IGD devices found")));
}
// Check for new devices later
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::minutes (5), [node_l = node.shared ()]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::minutes (5), [node_l = node.shared ()]() {
node_l->port_mapping.check_mapping_loop ();
});
}

View file

@ -93,7 +93,7 @@ void nano::rep_crawler::ongoing_crawl ()
// Reduce crawl frequency when there's enough total peer weight
unsigned next_run_ms = node.network_params.network.is_dev_network () ? 100 : sufficient_weight ? 7000 : 3000;
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (now + std::chrono::milliseconds (next_run_ms), [node_w, this]() {
node.workers.add_timed_task (now + std::chrono::milliseconds (next_run_ms), [node_w, this]() {
if (auto node_l = node_w.lock ())
{
this->ongoing_crawl ();
@ -152,7 +152,7 @@ void nano::rep_crawler::query (std::vector<std::shared_ptr<nano::transport::chan
// A representative must respond with a vote within the deadline
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash = hash_root.first]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [node_w, hash = hash_root.first]() {
if (auto node_l = node_w.lock ())
{
node_l->rep_crawler.remove (hash);

View file

@ -1,18 +1,11 @@
#include <nano/boost/asio/post.hpp>
#include <nano/lib/locks.hpp>
#include <nano/lib/numbers.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/signatures.hpp>
nano::signature_checker::signature_checker (unsigned num_threads) :
thread_pool (num_threads),
single_threaded (num_threads == 0),
num_threads (num_threads)
thread_pool (num_threads, nano::thread_role::name::signature_checking)
{
if (!single_threaded)
{
set_thread_names (num_threads);
}
}
nano::signature_checker::~signature_checker ()
@ -28,7 +21,7 @@ void nano::signature_checker::verify (nano::signature_check_set & check_a)
return;
}
if (check_a.size <= batch_size || single_threaded)
if (check_a.size <= batch_size || single_threaded ())
{
// Not dealing with many so just use the calling thread for checking signatures
auto result = verify_batch (check_a, 0, check_a.size);
@ -42,6 +35,7 @@ void nano::signature_checker::verify (nano::signature_check_set & check_a)
size_t overflow_size = check_a.size % batch_size;
size_t num_full_batches = check_a.size / batch_size;
auto const num_threads = thread_pool.get_num_threads ();
auto total_threads_to_split_over = num_threads + 1;
auto num_base_batches_each = num_full_batches / total_threads_to_split_over;
auto num_full_overflow_batches = num_full_batches % total_threads_to_split_over;
@ -82,7 +76,7 @@ void nano::signature_checker::stop ()
{
if (!stopped.exchange (true))
{
thread_pool.join ();
thread_pool.stop ();
}
}
@ -111,7 +105,7 @@ void nano::signature_checker::verify_async (nano::signature_check_set & check_a,
auto size = batch_size;
auto start_index = batch * batch_size;
boost::asio::post (thread_pool, [this, task, size, start_index, &promise] {
thread_pool.push_task ([this, task, size, start_index, &promise] {
auto result = this->verify_batch (task->check, start_index, size);
release_assert (result);
@ -124,27 +118,7 @@ void nano::signature_checker::verify_async (nano::signature_check_set & check_a,
}
}
// Set the names of all the threads in the thread pool for easier identification
void nano::signature_checker::set_thread_names (unsigned num_threads)
bool nano::signature_checker::single_threaded () const
{
std::vector<std::promise<void>> promises (num_threads);
std::vector<std::future<void>> futures;
futures.reserve (num_threads);
std::transform (promises.begin (), promises.end (), std::back_inserter (futures), [](auto & promise) {
return promise.get_future ();
});
for (auto i = 0u; i < num_threads; ++i)
{
boost::asio::post (thread_pool, [& promise = promises[i]]() {
nano::thread_role::set (nano::thread_role::name::signature_checking);
promise.set_value ();
});
}
// Wait until all threads have finished
for (auto & future : futures)
{
future.wait ();
}
return thread_pool.get_num_threads () == 0;
}

View file

@ -1,6 +1,6 @@
#pragma once
#include <nano/boost/asio/thread_pool.hpp>
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <atomic>
@ -38,6 +38,10 @@ public:
static size_t constexpr batch_size = 256;
private:
std::atomic<int> tasks_remaining{ 0 };
std::atomic<bool> stopped{ false };
nano::thread_pool thread_pool;
struct Task final
{
Task (nano::signature_check_set & check, size_t pending) :
@ -54,11 +58,6 @@ private:
bool verify_batch (const nano::signature_check_set & check_a, size_t index, size_t size);
void verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise<void> & promise);
void set_thread_names (unsigned num_threads);
boost::asio::thread_pool thread_pool;
std::atomic<int> tasks_remaining{ 0 };
const bool single_threaded;
unsigned num_threads;
std::atomic<bool> stopped{ false };
bool single_threaded () const;
};
}

View file

@ -124,7 +124,7 @@ void nano::socket::stop_timer ()
void nano::socket::checkup ()
{
std::weak_ptr<nano::socket> this_w (shared_from_this ());
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 2), [this_w]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 2), [this_w]() {
if (auto this_l = this_w.lock ())
{
uint64_t now (nano::seconds_since_epoch ());

View file

@ -1,6 +1,5 @@
#include <nano/lib/alarm.hpp>
#include <nano/lib/stats.hpp>
#include <nano/lib/worker.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/network.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/node/telemetry.hpp>
@ -19,10 +18,9 @@
using namespace std::chrono_literals;
nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> & observers_a, nano::stat & stats_a, nano::network_params & network_params_a, bool disable_ongoing_requests_a) :
nano::telemetry::telemetry (nano::network & network_a, nano::thread_pool & workers_a, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> & observers_a, nano::stat & stats_a, nano::network_params & network_params_a, bool disable_ongoing_requests_a) :
network (network_a),
alarm (alarm_a),
worker (worker_a),
workers (workers_a),
observers (observers_a),
stats (stats_a),
network_params (network_params_a),
@ -142,7 +140,7 @@ bool nano::telemetry::within_cache_cutoff (telemetry_info const & telemetry_info
void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_request_interval)
{
alarm.add (std::chrono::steady_clock::now () + next_request_interval, [this_w = std::weak_ptr<telemetry> (shared_from_this ())]() {
workers.add_timed_task (std::chrono::steady_clock::now () + next_request_interval, [this_w = std::weak_ptr<telemetry> (shared_from_this ())]() {
if (auto this_l = this_w.lock ())
{
// Check if there are any peers which are in the peers list which haven't been request, or any which are below or equal to the cache cutoff time
@ -264,13 +262,13 @@ std::unordered_map<nano::endpoint, nano::telemetry_data> nano::telemetry::get_me
void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::transport::channel> const & channel_a, std::function<void(telemetry_data_response const &)> const & callback_a)
{
auto invoke_callback_with_error = [&callback_a, &worker = this->worker, channel_a]() {
auto invoke_callback_with_error = [&callback_a, &workers = this->workers, channel_a]() {
nano::endpoint endpoint;
if (channel_a)
{
endpoint = channel_a->get_endpoint ();
}
worker.push_task ([callback_a, endpoint]() {
workers.push_task ([callback_a, endpoint]() {
auto const error = true;
callback_a ({ nano::telemetry_data{}, endpoint, error });
});
@ -280,9 +278,9 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr<nano::trans
{
if (channel_a && (channel_a->get_network_version () >= network_params.protocol.telemetry_protocol_version_min))
{
auto add_callback_async = [& worker = this->worker, &callback_a](telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a) {
auto add_callback_async = [& workers = this->workers, &callback_a](telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a) {
telemetry_data_response telemetry_data_response_l{ telemetry_data_a, endpoint_a, false };
worker.push_task ([telemetry_data_response_l, callback_a]() {
workers.push_task ([telemetry_data_response_l, callback_a]() {
callback_a (telemetry_data_response_l);
});
};
@ -372,7 +370,7 @@ void nano::telemetry::fire_request_message (std::shared_ptr<nano::transport::cha
else
{
// If no response is seen after a certain period of time remove it
this_l->alarm.add (std::chrono::steady_clock::now () + this_l->response_time_cutoff, [round_l, this_w, endpoint]() {
this_l->workers.add_timed_task (std::chrono::steady_clock::now () + this_l->response_time_cutoff, [round_l, this_w, endpoint]() {
if (auto this_l = this_w.lock ())
{
nano::lock_guard<std::mutex> guard (this_l->mutex);
@ -413,8 +411,8 @@ void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool
void nano::telemetry::flush_callbacks_async (nano::endpoint const & endpoint_a, bool error_a)
{
// Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise)
worker.push_task ([endpoint_a, error_a, this_w = std::weak_ptr<nano::telemetry> (shared_from_this ())]() {
// Post to thread_pool so that it's truly async and not on the calling thread (same problem as std::async otherwise)
workers.push_task ([endpoint_a, error_a, this_w = std::weak_ptr<nano::telemetry> (shared_from_this ())]() {
if (auto this_l = this_w.lock ())
{
nano::unique_lock<std::mutex> lk (this_l->mutex);

View file

@ -17,10 +17,9 @@ namespace mi = boost::multi_index;
namespace nano
{
class network;
class alarm;
class worker;
class stat;
class ledger;
class thread_pool;
namespace transport
{
class channel;
@ -61,7 +60,7 @@ public:
class telemetry : public std::enable_shared_from_this<telemetry>
{
public:
telemetry (nano::network &, nano::alarm &, nano::worker &, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> &, nano::stat &, nano::network_params &, bool);
telemetry (nano::network &, nano::thread_pool &, nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> &, nano::stat &, nano::network_params &, bool);
void start ();
void stop ();
@ -105,8 +104,7 @@ private:
};
nano::network & network;
nano::alarm & alarm;
nano::worker & worker;
nano::thread_pool & workers;
nano::observer_set<nano::telemetry_data const &, nano::endpoint const &> & observers;
nano::stat & stats;
/* Important that this is a reference to the node network_params for tests which want to modify genesis block */

View file

@ -30,7 +30,7 @@ std::shared_ptr<nano::node> nano::system::add_node (nano::node_flags node_flags_
/** Returns the node added. */
std::shared_ptr<nano::node> nano::system::add_node (nano::node_config const & node_config_a, nano::node_flags node_flags_a, nano::transport::transport_type type_a)
{
auto node (std::make_shared<nano::node> (io_ctx, nano::unique_path (), alarm, node_config_a, work, node_flags_a, node_sequence++));
auto node (std::make_shared<nano::node> (io_ctx, nano::unique_path (), node_config_a, work, node_flags_a, node_sequence++));
debug_assert (!node->init_error ());
node->start ();
node->wallets.create (nano::random_wallet_id ());
@ -283,7 +283,7 @@ public:
if (count_l > 0)
{
auto this_l (shared_from_this ());
node->alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (wait), [this_l]() { this_l->run (); });
node->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (wait), [this_l]() { this_l->run (); });
}
}
std::vector<nano::account> accounts;

View file

@ -47,7 +47,6 @@ public:
std::shared_ptr<nano::node> add_node (nano::node_flags = nano::node_flags (), nano::transport::transport_type = nano::transport::transport_type::tcp);
std::shared_ptr<nano::node> add_node (nano::node_config const &, nano::node_flags = nano::node_flags (), nano::transport::transport_type = nano::transport::transport_type::tcp);
boost::asio::io_context io_ctx;
nano::alarm alarm{ io_ctx };
std::vector<std::shared_ptr<nano::node>> nodes;
nano::logging logging;
nano::work_pool work{ std::max (std::thread::hardware_concurrency (), 1u) };

View file

@ -468,7 +468,7 @@ void nano::transport::tcp_channels::ongoing_keepalive ()
}
}
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.half_period, [node_w]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.node.half_period, [node_w]() {
if (auto node_l = node_w.lock ())
{
if (!node_l->network.tcp_channels.stopped)

View file

@ -302,7 +302,7 @@ void nano::transport::udp_channels::receive ()
}
if (!this->stopped)
{
this->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() { this->receive (); });
this->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() { this->receive (); });
}
}
}));
@ -696,7 +696,7 @@ void nano::transport::udp_channels::ongoing_keepalive ()
channel->send (message);
}
std::weak_ptr<nano::node> node_w (node.shared ());
node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.period, [node_w]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.node.period, [node_w]() {
if (auto node_l = node_w.lock ())
{
node_l->network.udp_channels.ongoing_keepalive ();

View file

@ -1149,7 +1149,7 @@ void nano::wallet::work_ensure (nano::account const & account_a, nano::root cons
wallets.delayed_work->operator[] (account_a) = root_a;
wallets.node.alarm.add (std::chrono::steady_clock::now () + precache_delay, [this_l = shared_from_this (), account_a, root_a] {
wallets.node.workers.add_timed_task (std::chrono::steady_clock::now () + precache_delay, [this_l = shared_from_this (), account_a, root_a] {
auto delayed_work = this_l->wallets.delayed_work.lock ();
auto existing (delayed_work->find (account_a));
if (existing != delayed_work->end () && existing->second == root_a)
@ -1358,7 +1358,7 @@ void nano::work_watcher::update (nano::qualified_root const & root_a, std::share
void nano::work_watcher::watching (nano::qualified_root const & root_a, std::shared_ptr<nano::state_block> const & block_a)
{
std::weak_ptr<nano::work_watcher> watcher_w (shared_from_this ());
node.alarm.add (std::chrono::steady_clock::now () + node.config.work_watcher_period, [block_a, root_a, watcher_w]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.config.work_watcher_period, [block_a, root_a, watcher_w]() {
auto watcher_l = watcher_w.lock ();
if (watcher_l && !watcher_l->stopped && watcher_l->is_watched (root_a))
{
@ -1804,7 +1804,7 @@ void nano::wallets::ongoing_compute_reps ()
compute_reps ();
auto & node_l (node);
auto compute_delay (network_params.network.is_dev_network () ? std::chrono::milliseconds (10) : std::chrono::milliseconds (15 * 60 * 1000)); // Representation drifts quickly on the test network but very slowly on the live network
node.alarm.add (std::chrono::steady_clock::now () + compute_delay, [&node_l]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + compute_delay, [&node_l]() {
node_l.wallets.ongoing_compute_reps ();
});
}

View file

@ -102,7 +102,7 @@ wallet (wallet_a)
QObject::connect (copy_button, &QPushButton::clicked, [this]() {
this->wallet.application.clipboard ()->setText (QString (this->wallet.account.to_account ().c_str ()));
copy_button->setText ("Copied!");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (2), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (2), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
copy_button->setText ("Copy");
}));
@ -196,7 +196,7 @@ wallet (wallet_a)
this->wallet.wallet_m->deterministic_insert (transaction);
show_button_success (*create_account);
create_account->setText ("New account was created");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*create_account);
create_account->setText ("Create account");
@ -207,7 +207,7 @@ wallet (wallet_a)
{
show_button_error (*create_account);
create_account->setText ("Wallet is locked, unlock it to create account");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*create_account);
create_account->setText ("Create account");
@ -229,7 +229,7 @@ wallet (wallet_a)
this->wallet.application.clipboard ()->setText (QString (seed.data.to_string ().c_str ()));
show_button_success (*backup_seed);
backup_seed->setText ("Seed was copied to clipboard");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*backup_seed);
backup_seed->setText ("Copy wallet seed to clipboard");
@ -241,7 +241,7 @@ wallet (wallet_a)
this->wallet.application.clipboard ()->setText ("");
show_button_error (*backup_seed);
backup_seed->setText ("Wallet is locked, unlock it to enable the backup");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*backup_seed);
backup_seed->setText ("Copy wallet seed to clipboard");
@ -275,7 +275,7 @@ void nano_qt::accounts::refresh_wallet_balance ()
final_text += "\nPending: " + wallet.format_balance (pending);
}
wallet_balance_label->setText (QString (final_text.c_str ()));
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (60), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (60), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
refresh_wallet_balance ();
}));
@ -405,7 +405,7 @@ wallet (wallet_a)
show_line_error (*seed);
show_button_error (*import_seed);
import_seed->setText ("Wallet is locked, unlock it to enable the import");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (10), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (10), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_line_ok (*seed);
show_button_ok (*import_seed);
@ -422,7 +422,7 @@ wallet (wallet_a)
show_button_success (*import_seed);
import_seed->setText ("Successful import of seed");
this->wallet.refresh ();
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*import_seed);
import_seed->setText ("Import seed");
@ -442,7 +442,7 @@ wallet (wallet_a)
{
import_seed->setText ("Incorrect seed. Only HEX characters allowed");
}
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*import_seed);
import_seed->setText ("Import seed");
@ -455,7 +455,7 @@ wallet (wallet_a)
show_line_error (*clear_line);
show_button_error (*import_seed);
import_seed->setText ("Type words 'clear keys'");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*import_seed);
import_seed->setText ("Import seed");
@ -727,7 +727,7 @@ void nano_qt::block_viewer::rebroadcast_action (nano::block_hash const & hash_a)
if (!successor.is_zero ())
{
done = false;
wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this, successor]() {
wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this, successor]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this, successor]() {
rebroadcast_action (successor);
}));
@ -1122,7 +1122,7 @@ void nano_qt::wallet::ongoing_refresh ()
}
}));
node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [wallet_w]() {
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [wallet_w]() {
if (auto wallet_l = wallet_w.lock ())
{
wallet_l->ongoing_refresh ();
@ -1206,7 +1206,7 @@ void nano_qt::wallet::start ()
{
show_button_error (*this_l->send_blocks_send);
this_l->send_blocks_send->setText ("Wallet is locked, unlock it to send");
this_l->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w]() {
this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w]() {
@ -1225,7 +1225,7 @@ void nano_qt::wallet::start ()
show_line_error (*this_l->send_count);
show_button_error (*this_l->send_blocks_send);
this_l->send_blocks_send->setText ("Not enough balance");
this_l->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w]() {
this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w]() {
@ -1244,7 +1244,7 @@ void nano_qt::wallet::start ()
show_line_error (*this_l->send_account);
show_button_error (*this_l->send_blocks_send);
this_l->send_blocks_send->setText ("Bad destination account");
this_l->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w]() {
this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w]() {
@ -1263,7 +1263,7 @@ void nano_qt::wallet::start ()
show_line_error (*this_l->send_count);
show_button_error (*this_l->send_blocks_send);
this_l->send_blocks_send->setText ("Bad amount number");
this_l->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w]() {
this_l->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w]() {
@ -1439,11 +1439,9 @@ void nano_qt::wallet::update_connected ()
void nano_qt::wallet::empty_password ()
{
this->node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (3), [this]() {
this->node.worker.push_task ([this]() {
auto transaction (wallet_m->wallets.tx_begin_write ());
wallet_m->enter_password (transaction, std::string (""));
});
this->node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (3), [this]() {
auto transaction (wallet_m->wallets.tx_begin_write ());
wallet_m->enter_password (transaction, std::string (""));
});
}
@ -1553,7 +1551,7 @@ wallet (wallet_a)
change->setText ("Password was changed");
this->wallet.node.logger.try_log ("Wallet password changed");
update_locked (false, false);
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*change);
change->setText ("Set/Change password");
@ -1571,7 +1569,7 @@ wallet (wallet_a)
{
show_button_error (*change);
change->setText ("Wallet is locked, unlock it");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*change);
change->setText ("Set/Change password");
@ -1597,7 +1595,7 @@ wallet (wallet_a)
change_rep->setText ("Representative was changed");
current_representative->setText (QString (representative_l.to_account ().c_str ()));
new_representative->clear ();
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*change_rep);
change_rep->setText ("Change representative");
@ -1608,7 +1606,7 @@ wallet (wallet_a)
{
show_button_error (*change_rep);
change_rep->setText ("Wallet is locked, unlock it");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_button_ok (*change_rep);
change_rep->setText ("Change representative");
@ -1621,7 +1619,7 @@ wallet (wallet_a)
show_line_error (*new_representative);
show_button_error (*change_rep);
change_rep->setText ("Invalid account");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_line_ok (*new_representative);
show_button_ok (*change_rep);
@ -1661,7 +1659,7 @@ wallet (wallet_a)
show_line_error (*password);
show_button_error (*lock_toggle);
lock_toggle->setText ("Invalid password");
this->wallet.node.alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (5), [this]() {
this->wallet.application.postEvent (&this->wallet.processor, new eventloop_event ([this]() {
show_line_ok (*password);
show_button_ok (*lock_toggle);

View file

@ -160,7 +160,7 @@ TEST (rpc, wrapped_task)
// Exception should get caught
throw std::runtime_error ("");
}));
system.nodes[0]->worker.push_task (task);
system.nodes[0]->workers.push_task (task);
ASSERT_TIMELY (5s, response == true);
}
@ -2285,7 +2285,7 @@ TEST (rpc, keepalive)
{
nano::system system;
auto node0 = add_ipc_enabled_node (system);
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
node1->start ();
system.nodes.push_back (node1);
scoped_io_thread_name_change scoped_thread_name_io;

View file

@ -63,7 +63,7 @@ TEST (system, receive_while_synchronizing)
uint32_t count (1000);
system.generate_mass_activity (count, *system.nodes[0]);
nano::keypair key;
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.logging, system.work));
ASSERT_FALSE (node1->init_error ());
auto wallet (node1->wallets.create (1));
wallet->insert_adhoc (nano::dev_genesis_key.prv); // For voting
@ -71,7 +71,7 @@ TEST (system, receive_while_synchronizing)
node1->start ();
system.nodes.push_back (node1);
ASSERT_NE (nullptr, nano::establish_tcp (system, *node1, node->network.endpoint ()));
system.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (200), ([&system, &key]() {
node1->workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::milliseconds (200), ([&system, &key]() {
auto hash (system.wallet (0)->send_sync (nano::dev_genesis_key.pub, key.pub, system.nodes[0]->config.receive_minimum.number ()));
auto transaction (system.nodes[0]->store.tx_begin_read ());
auto block (system.nodes[0]->store.block_get (transaction, hash));
@ -1426,7 +1426,7 @@ TEST (telemetry, many_nodes)
// Make a metric completely different for each node so we can check afterwards that there are no duplicates
node_config.bandwidth_limit = 100000 + i;
auto node = std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, node_config, system.work, node_flags);
auto node = std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), node_config, system.work, node_flags);
node->start ();
system.nodes.push_back (node);
}