diff --git a/nano/core_test/bootstrap.cpp b/nano/core_test/bootstrap.cpp index b78b65d5d..4442b7ede 100644 --- a/nano/core_test/bootstrap.cpp +++ b/nano/core_test/bootstrap.cpp @@ -296,7 +296,6 @@ TEST (bootstrap_processor, process_none) node1->bootstrap_initiator.bootstrap (system.nodes[0]->network.endpoint (), false); ASSERT_TIMELY (5s, done); - node1->stop (); } // Bootstrap can pull one basic block @@ -320,7 +319,6 @@ TEST (bootstrap_processor, process_one) ASSERT_NE (node0->latest (nano::dev::genesis_key.pub), node1->latest (nano::dev::genesis_key.pub)); node1->bootstrap_initiator.bootstrap (node0->network.endpoint (), false); ASSERT_TIMELY_EQ (10s, node1->latest (nano::dev::genesis_key.pub), node0->latest (nano::dev::genesis_key.pub)); - node1->stop (); } TEST (bootstrap_processor, process_two) @@ -341,7 +339,6 @@ TEST (bootstrap_processor, process_two) ASSERT_NE (node1->latest (nano::dev::genesis_key.pub), node0->latest (nano::dev::genesis_key.pub)); // nodes should be out of sync here node1->bootstrap_initiator.bootstrap (node0->network.endpoint (), false); // bootstrap triggered ASSERT_TIMELY_EQ (5s, node1->latest (nano::dev::genesis_key.pub), node0->latest (nano::dev::genesis_key.pub)); // nodes should sync up - node1->stop (); } // Bootstrap can pull universal blocks @@ -387,7 +384,6 @@ TEST (bootstrap_processor, process_state) ASSERT_NE (node1->latest (nano::dev::genesis_key.pub), block2->hash ()); node1->bootstrap_initiator.bootstrap (node0->network.endpoint (), false); ASSERT_TIMELY_EQ (5s, node1->latest (nano::dev::genesis_key.pub), block2->hash ()); - node1->stop (); } TEST (bootstrap_processor, process_new) @@ -426,7 +422,6 @@ TEST (bootstrap_processor, process_new) auto node3 = system.make_disconnected_node (); node3->bootstrap_initiator.bootstrap (node1->network.endpoint (), false); ASSERT_TIMELY_EQ (5s, node3->balance (key2.pub), amount); - node3->stop (); } TEST (bootstrap_processor, pull_diamond) @@ -478,7 +473,6 @@ TEST (bootstrap_processor, pull_diamond) auto node1 = system.make_disconnected_node (); node1->bootstrap_initiator.bootstrap (node0->network.endpoint (), false); ASSERT_TIMELY_EQ (5s, node1->balance (nano::dev::genesis_key.pub), 100); - node1->stop (); } TEST (bootstrap_processor, DISABLED_pull_requeue_network_error) @@ -517,7 +511,6 @@ TEST (bootstrap_processor, DISABLED_pull_requeue_network_error) ++attempt->pulling; node1->bootstrap_initiator.connections->pulls.emplace_back (nano::dev::genesis_key.pub, send1->hash (), nano::dev::genesis->hash (), attempt->incremental_id); node1->bootstrap_initiator.connections->request_pull (lock); - node2->stop (); } ASSERT_TIMELY (5s, attempt == nullptr || attempt->requeued_pulls == 1); ASSERT_EQ (0, node1->stats.count (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_failed_account, nano::stat::dir::in)); // Requeue is not increasing failed attempts @@ -586,7 +579,6 @@ TEST (bootstrap_processor, push_diamond) auto node2 = system.add_node (config, flags); node1->bootstrap_initiator.bootstrap (node2->network.endpoint (), false); ASSERT_TIMELY_EQ (5s, node2->balance (nano::dev::genesis_key.pub), 100); - node1->stop (); } TEST (bootstrap_processor, push_diamond_pruning) @@ -677,7 +669,6 @@ TEST (bootstrap_processor, push_diamond_pruning) node1->bootstrap_initiator.bootstrap (node0->network.endpoint (), false); ASSERT_TIMELY_EQ (5s, node0->balance (nano::dev::genesis_key.pub), 100); ASSERT_TIMELY_EQ (5s, node1->balance (nano::dev::genesis_key.pub), 100); - node1->stop (); } TEST (bootstrap_processor, push_one) @@ -700,7 +691,6 @@ TEST (bootstrap_processor, push_one) node1->bootstrap_initiator.bootstrap (node0->network.endpoint (), false); ASSERT_TIMELY_EQ (5s, node0->balance (nano::dev::genesis_key.pub), genesis_balance - 100); - node1->stop (); } TEST (bootstrap_processor, lazy_hash) @@ -775,7 +765,6 @@ TEST (bootstrap_processor, lazy_hash) } // Check processed blocks ASSERT_TIMELY (10s, node1->balance (key2.pub) != 0); - node1->stop (); } TEST (bootstrap_processor, lazy_hash_bootstrap_id) @@ -850,7 +839,6 @@ TEST (bootstrap_processor, lazy_hash_bootstrap_id) } // Check processed blocks ASSERT_TIMELY (10s, node1->balance (key2.pub) != 0); - node1->stop (); } TEST (bootstrap_processor, lazy_hash_pruning) @@ -1003,7 +991,6 @@ TEST (bootstrap_processor, lazy_hash_pruning) ASSERT_TIMELY_EQ (5s, node1->ledger.cache.block_count, 9); ASSERT_TIMELY (5s, node1->balance (key2.pub) != 0); ASSERT_TIMELY (5s, !node1->bootstrap_initiator.in_progress ()); - node1->stop (); } TEST (bootstrap_processor, lazy_max_pull_count) @@ -1105,7 +1092,6 @@ TEST (bootstrap_processor, lazy_max_pull_count) node1->bootstrap_initiator.bootstrap_lazy (change3->hash ()); // Check processed blocks ASSERT_TIMELY (10s, node1->block (change3->hash ())); - node1->stop (); } TEST (bootstrap_processor, lazy_unclear_state_link) @@ -1174,7 +1160,6 @@ TEST (bootstrap_processor, lazy_unclear_state_link) node2->bootstrap_initiator.bootstrap_lazy (receive->hash ()); ASSERT_TIMELY (5s, nano::test::exists (*node2, { send1, send2, open, receive })); ASSERT_EQ (0, node2->stats.count (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_failed_account, nano::stat::dir::in)); - node2->stop (); } TEST (bootstrap_processor, lazy_unclear_state_link_not_existing) @@ -1233,7 +1218,6 @@ TEST (bootstrap_processor, lazy_unclear_state_link_not_existing) ASSERT_TIMELY (15s, !node2->bootstrap_initiator.in_progress ()); ASSERT_TIMELY (15s, nano::test::block_or_pruned_all_exists (*node2, { send1, open, send2 })); ASSERT_EQ (1, node2->stats.count (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_failed_account, nano::stat::dir::in)); - node2->stop (); } TEST (bootstrap_processor, lazy_destinations) @@ -1312,7 +1296,6 @@ TEST (bootstrap_processor, lazy_destinations) ASSERT_TIMELY (5s, node2->ledger.block_or_pruned_exists (send2->hash ())); ASSERT_FALSE (node2->ledger.block_or_pruned_exists (open->hash ())); ASSERT_FALSE (node2->ledger.block_or_pruned_exists (state_open->hash ())); - node2->stop (); } TEST (bootstrap_processor, lazy_pruning_missing_block) @@ -1421,7 +1404,6 @@ TEST (bootstrap_processor, lazy_pruning_missing_block) ASSERT_TIMELY_EQ (5s, 3, node2->ledger.cache.block_count); ASSERT_TIMELY (5s, nano::test::exists (*node2, { send1, send2 })); ASSERT_TRUE (nano::test::block_or_pruned_none_exists (*node2, { open, state_open })); - node2->stop (); } TEST (bootstrap_processor, lazy_cancel) @@ -1456,7 +1438,6 @@ TEST (bootstrap_processor, lazy_cancel) } // Cancel failing lazy bootstrap ASSERT_TIMELY (10s, !node1->bootstrap_initiator.in_progress ()); - node1->stop (); } TEST (bootstrap_processor, wallet_lazy_frontier) @@ -1537,7 +1518,6 @@ TEST (bootstrap_processor, wallet_lazy_frontier) } // Check processed blocks ASSERT_TIMELY (10s, node1->ledger.block_or_pruned_exists (receive2->hash ())); - node1->stop (); } TEST (bootstrap_processor, wallet_lazy_pending) @@ -1684,7 +1664,6 @@ TEST (bootstrap_processor, multiple_attempts) ASSERT_TIMELY (10s, node2->balance (key2.pub) != 0); // Check attempts finish ASSERT_TIMELY_EQ (5s, node2->bootstrap_initiator.attempts.size (), 0); - node2->stop (); } TEST (frontier_req_response, DISABLED_destruction) @@ -1996,7 +1975,6 @@ TEST (bulk, genesis) node2->bootstrap_initiator.bootstrap (node1->network.endpoint (), false); ASSERT_TIMELY_EQ (10s, node2->latest (nano::dev::genesis_key.pub), node1->latest (nano::dev::genesis_key.pub)); ASSERT_EQ (node2->latest (nano::dev::genesis_key.pub), node1->latest (nano::dev::genesis_key.pub)); - node2->stop (); } TEST (bulk, offline_send) @@ -2036,7 +2014,6 @@ TEST (bulk, offline_send) ASSERT_TIMELY_EQ (5s, node2->balance (nano::dev::genesis_key.pub), std::numeric_limits::max () - amount); // Receiving send block ASSERT_TIMELY_EQ (5s, node2->balance (key2.pub), amount); - node2->stop (); } TEST (bulk, genesis_pruning) @@ -2115,7 +2092,6 @@ TEST (bulk, genesis_pruning) ASSERT_TIMELY_EQ (5s, node2->bootstrap_initiator.connections->connections_count, 0); node2->bootstrap_initiator.bootstrap (node1->network.endpoint (), false); ASSERT_TIMELY_EQ (5s, node2->latest (nano::dev::genesis_key.pub), node1->latest (nano::dev::genesis_key.pub)); - node2->stop (); } TEST (bulk_pull_account, basics) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index d5a97da73..cafb62282 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -105,7 +105,6 @@ TEST (network, send_node_id_handshake_tcp) auto list2 (node1->network.list (1)); ASSERT_EQ (nano::transport::transport_type::tcp, list2[0]->get_type ()); ASSERT_EQ (node0->get_node_id (), list2[0]->get_node_id ()); - node1->stop (); } TEST (network, last_contacted) @@ -531,46 +530,10 @@ TEST (network, ipv6_from_ipv4) ASSERT_TRUE (endpoint2.address ().is_v6 ()); } -TEST (network, ipv6_bind_send_ipv4) -{ - nano::test::system system; - nano::endpoint endpoint1 (boost::asio::ip::address_v6::any (), 0); - nano::endpoint endpoint2 (boost::asio::ip::address_v4::any (), 0); - std::array bytes1{}; - std::atomic finish1{ false }; - nano::endpoint endpoint3; - boost::asio::ip::udp::socket socket1 (*system.io_ctx, endpoint1); - socket1.async_receive_from (boost::asio::buffer (bytes1.data (), bytes1.size ()), endpoint3, [&finish1] (boost::system::error_code const & error, size_t size_a) { - ASSERT_FALSE (error); - ASSERT_EQ (16, size_a); - finish1 = true; - }); - boost::asio::ip::udp::socket socket2 (*system.io_ctx, endpoint2); - nano::endpoint endpoint5 (boost::asio::ip::address_v4::loopback (), socket1.local_endpoint ().port ()); - nano::endpoint endpoint6 (boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4::loopback ()), socket2.local_endpoint ().port ()); - socket2.async_send_to (boost::asio::buffer (std::array{}, 16), endpoint5, [] (boost::system::error_code const & error, size_t size_a) { - ASSERT_FALSE (error); - ASSERT_EQ (16, size_a); - }); - auto iterations (0); - ASSERT_TIMELY (5s, finish1); - ASSERT_EQ (endpoint6, endpoint3); - std::array bytes2; - nano::endpoint endpoint4; - socket2.async_receive_from (boost::asio::buffer (bytes2.data (), bytes2.size ()), endpoint4, [] (boost::system::error_code const & error, size_t size_a) { - ASSERT_FALSE (!error); - ASSERT_EQ (16, size_a); - }); - socket1.async_send_to (boost::asio::buffer (std::array{}, 16), endpoint6, [] (boost::system::error_code const & error, size_t size_a) { - ASSERT_FALSE (error); - ASSERT_EQ (16, size_a); - }); -} - TEST (network, endpoint_bad_fd) { nano::test::system system (1); - system.nodes[0]->stop (); + system.stop_node (*system.nodes[0]); auto endpoint (system.nodes[0]->network.endpoint ()); ASSERT_TRUE (endpoint.address ().is_loopback ()); // The endpoint is invalidated asynchronously diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index f9bc35ec5..d396a2cba 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -40,8 +40,6 @@ TEST (node, stop) { nano::test::system system (1); ASSERT_NE (system.nodes[0]->wallets.items.end (), system.nodes[0]->wallets.items.begin ()); - system.nodes[0]->stop (); - system.io_ctx->run (); ASSERT_TRUE (true); } @@ -77,9 +75,10 @@ TEST (node, block_store_path_failure) auto path (nano::unique_path ()); nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits::max () }; auto node (std::make_shared (io_ctx, system.get_available_port (), path, pool)); + system.register_node (node); ASSERT_TRUE (node->wallets.items.empty ()); - node->stop (); } + #if defined(__clang__) && defined(__linux__) && CI // Disable test due to instability with clang and actions TEST (node_DeathTest, DISABLED_readonly_block_store_not_exist) @@ -102,16 +101,13 @@ TEST (node_DeathTest, readonly_block_store_not_exist) TEST (node, password_fanout) { nano::test::system system; - auto io_ctx = std::make_shared (); - auto path (nano::unique_path ()); nano::node_config config; config.peering_port = system.get_available_port (); nano::work_pool pool{ nano::dev::network_params.network, std::numeric_limits::max () }; config.password_fanout = 10; - nano::node node (io_ctx, path, config, pool); + auto & node = *system.add_node (config); auto wallet (node.wallets.create (100)); ASSERT_EQ (10, wallet->store.password.values.size ()); - node.stop (); } TEST (node, balance) @@ -284,8 +280,6 @@ TEST (node, auto_bootstrap) ASSERT_TIMELY_EQ (5s, node1->ledger.cache.block_count, 3); // Confirmation for all blocks ASSERT_TIMELY_EQ (5s, node1->ledger.cache.cemented_count, 3); - - node1->stop (); } TEST (node, auto_bootstrap_reverse) @@ -329,8 +323,6 @@ TEST (node, auto_bootstrap_age) ASSERT_TIMELY (10s, node0->stats.count (nano::stat::type::bootstrap, nano::stat::detail::initiate_legacy_age, nano::stat::dir::out) >= 3); // More attempts with frontiers age ASSERT_GE (node0->stats.count (nano::stat::type::bootstrap, nano::stat::detail::initiate_legacy_age, nano::stat::dir::out), node0->stats.count (nano::stat::type::bootstrap, nano::stat::detail::initiate, nano::stat::dir::out)); - - node1->stop (); } TEST (node, merge_peers) @@ -2843,7 +2835,7 @@ TEST (node, peers) ASSERT_TRUE (store.peer.exists (store.tx_begin_read (), endpoint_key)); // Stop the peer node and check that it is removed from the store - node1->stop (); + system.stop_node (*node1); // TODO: In `tcp_channels::store_all` we skip store operation when there are no peers present, // so the best we can do here is check if network is empty @@ -2873,7 +2865,7 @@ TEST (node, peer_cache_restart) auto list (node2->network.list (2)); ASSERT_EQ (node1->network.endpoint (), list[0]->get_endpoint ()); ASSERT_EQ (1, node2->network.size ()); - node2->stop (); + system.stop_node (*node2); } // Restart node { @@ -2896,7 +2888,7 @@ TEST (node, peer_cache_restart) auto list (node3->network.list (2)); ASSERT_EQ (node1->network.endpoint (), list[0]->get_endpoint ()); ASSERT_EQ (1, node3->network.size ()); - node3->stop (); + system.stop_node (*node3); } } diff --git a/nano/core_test/rep_crawler.cpp b/nano/core_test/rep_crawler.cpp index 0dc098ada..1addde26c 100644 --- a/nano/core_test/rep_crawler.cpp +++ b/nano/core_test/rep_crawler.cpp @@ -220,7 +220,7 @@ TEST (rep_crawler, rep_remove) ASSERT_TIMELY_EQ (10s, searching_node.rep_crawler.representative_count (), 2); // When Rep2 is stopped, it should not be found as principal representative anymore - node_rep2->stop (); + system.stop_node (*node_rep2); ASSERT_TIMELY_EQ (10s, searching_node.rep_crawler.representative_count (), 1); // Now only genesisRep should be found: @@ -239,7 +239,7 @@ TEST (rep_crawler, rep_connection_close) // Add working representative (node 2) system.wallet (1)->insert_adhoc (nano::dev::genesis_key.prv); ASSERT_TIMELY_EQ (10s, node1.rep_crawler.representative_count (), 1); - node2.stop (); + system.stop_node (node2); // Remove representative with closed channel ASSERT_TIMELY_EQ (10s, node1.rep_crawler.representative_count (), 0); } diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index bbbf9d88a..85d821179 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -31,8 +31,11 @@ TEST (socket, max_connections) // start a server socket that allows max 2 live connections auto listener = std::make_shared (server_port, *node, 2); nano::test::stop_guard stop_guard{ *listener }; - listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { - server_sockets.push_back (new_connection); + listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec) { + if (!ec) + { + server_sockets.push_back (new_connection); + } return true; }); @@ -103,8 +106,6 @@ TEST (socket, max_connections) ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 5); ASSERT_TIMELY_EQ (5s, connection_attempts, 8); // connections initiated by the client ASSERT_TIMELY_EQ (5s, server_sockets.size (), 5); // connections accepted by the server - - node->stop (); } TEST (socket, max_connections_per_ip) @@ -126,8 +127,11 @@ TEST (socket, max_connections_per_ip) auto listener = std::make_shared (server_port, *node, max_global_connections); nano::test::stop_guard stop_guard{ *listener }; - listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { - server_sockets.push_back (new_connection); + listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec) { + if (!ec) + { + server_sockets.push_back (new_connection); + } return true; }); @@ -162,8 +166,6 @@ TEST (socket, max_connections_per_ip) ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), max_ip_connections); ASSERT_TIMELY_EQ (5s, get_tcp_max_per_ip (), 1); ASSERT_TIMELY_EQ (5s, connection_attempts, max_ip_connections + 1); - - node->stop (); } TEST (socket, limited_subnet_address) @@ -247,8 +249,11 @@ TEST (socket, max_connections_per_subnetwork) auto listener = std::make_shared (server_port, *node, max_global_connections); nano::test::stop_guard stop_guard{ *listener }; - listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { - server_sockets.push_back (new_connection); + listener->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec) { + if (!ec) + { + server_sockets.push_back (new_connection); + } return true; }); @@ -283,8 +288,6 @@ TEST (socket, max_connections_per_subnetwork) ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), max_subnetwork_connections); ASSERT_TIMELY_EQ (5s, get_tcp_max_per_subnetwork (), 1); ASSERT_TIMELY_EQ (5s, connection_attempts, max_subnetwork_connections + 1); - - node->stop (); } TEST (socket, disabled_max_peers_per_ip) @@ -308,8 +311,11 @@ TEST (socket, disabled_max_peers_per_ip) auto server_socket = std::make_shared (server_port, *node, max_global_connections); nano::test::stop_guard stop_guard{ *server_socket }; - server_socket->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { - server_sockets.push_back (new_connection); + server_socket->start ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec) { + if (!ec) + { + server_sockets.push_back (new_connection); + } return true; }); @@ -344,8 +350,6 @@ TEST (socket, disabled_max_peers_per_ip) ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), max_ip_connections + 1); ASSERT_TIMELY_EQ (5s, get_tcp_max_per_ip (), 0); ASSERT_TIMELY_EQ (5s, connection_attempts, max_ip_connections + 1); - - node->stop (); } TEST (socket, disconnection_of_silent_connections) @@ -369,8 +373,11 @@ TEST (socket, disconnection_of_silent_connections) // start a server listening socket auto listener = std::make_shared (server_port, *node, 1); nano::test::stop_guard stop_guard{ *listener }; - listener->start ([&server_data_socket] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { - server_data_socket = new_connection; + listener->start ([&server_data_socket] (std::shared_ptr const & new_connection, boost::system::error_code const & ec) { + if (!ec) + { + server_data_socket = new_connection; + } return true; }); @@ -399,8 +406,6 @@ TEST (socket, disconnection_of_silent_connections) ASSERT_EQ (0, get_tcp_io_timeout_drops ()); // Asserts the silent checker worked. ASSERT_EQ (1, get_tcp_silent_connection_drops ()); - - node->stop (); } TEST (socket, drop_policy) @@ -421,8 +426,11 @@ TEST (socket, drop_policy) auto listener = std::make_shared (server_port, *node, 1); nano::test::stop_guard stop_guard{ *listener }; - listener->start ([&connections] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { - connections.push_back (new_connection); + listener->start ([&connections] (std::shared_ptr const & new_connection, boost::system::error_code const & ec) { + if (!ec) + { + connections.push_back (new_connection); + } return true; }); diff --git a/nano/core_test/telemetry.cpp b/nano/core_test/telemetry.cpp index e607a3603..fe6e5db72 100644 --- a/nano/core_test/telemetry.cpp +++ b/nano/core_test/telemetry.cpp @@ -334,16 +334,14 @@ TEST (telemetry, disconnected) nano::node_flags node_flags; auto node_client = system.add_node (node_flags); auto node_server = system.add_node (node_flags); - nano::test::wait_peer_connections (system); - auto channel = node_client->network.find_node_id (node_server->get_node_id ()); ASSERT_NE (nullptr, channel); // Ensure telemetry is available before disconnecting ASSERT_TIMELY (5s, node_client->telemetry.get_telemetry (channel->get_endpoint ())); - node_server->stop (); + system.stop_node (*node_server); ASSERT_TRUE (channel); // Ensure telemetry from disconnected peer is removed diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index a5782edd7..79f4df8de 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -74,6 +74,7 @@ enum class type vote_generator, rep_tiers, syn_cookies, + thread_runner, // bootstrap bulk_pull_client, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 7c75351c4..52eb3bba1 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -1,6 +1,13 @@ #include #include +#include + +std::string_view nano::thread_role::to_string (nano::thread_role::name name) +{ + return magic_enum::enum_name (name); +} + std::string nano::thread_role::get_string (nano::thread_role::name role) { std::string thread_role_name_string; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index a00a55226..c1dfce2d3 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -50,6 +50,8 @@ enum class name network_reachout, }; +std::string_view to_string (name); + /* * Get/Set the identifier for the current thread */ diff --git a/nano/lib/thread_runner.cpp b/nano/lib/thread_runner.cpp index 363a21795..62a7625cd 100644 --- a/nano/lib/thread_runner.cpp +++ b/nano/lib/thread_runner.cpp @@ -19,11 +19,13 @@ nano::thread_runner::thread_runner (std::shared_ptr io_ for (auto i (0u); i < num_threads; ++i) { - threads.emplace_back (nano::thread_attributes::get_default (), [this] () { + threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () { nano::thread_role::set (role); try { + logger.debug (nano::log::type::thread_runner, "Thread #{} ({}) started", i, to_string (role)); run (*io_ctx); + logger.debug (nano::log::type::thread_runner, "Thread #{} ({}) stopped", i, to_string (role)); } catch (std::exception const & ex) { diff --git a/nano/lib/thread_runner.hpp b/nano/lib/thread_runner.hpp index 975beab3f..116a1aa65 100644 --- a/nano/lib/thread_runner.hpp +++ b/nano/lib/thread_runner.hpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -11,8 +12,10 @@ namespace nano { class thread_runner final { + nano::logger logger; + public: - thread_runner (std::shared_ptr, unsigned num_threads, nano::thread_role::name thread_role = nano::thread_role::name::io); + thread_runner (std::shared_ptr, unsigned num_threads = nano::hardware_concurrency (), nano::thread_role::name thread_role = nano::thread_role::name::io); ~thread_runner (); /** Tells the IO context to stop processing events.*/ diff --git a/nano/nano_node/daemon.cpp b/nano/nano_node/daemon.cpp index 879bc5e2d..0ed9ee705 100644 --- a/nano/nano_node/daemon.cpp +++ b/nano/nano_node/daemon.cpp @@ -150,7 +150,7 @@ void nano::daemon::run (std::filesystem::path const & data_path, nano::node_flag nano::ipc::ipc_server ipc_server (*node, config.rpc); std::unique_ptr rpc_process; - std::unique_ptr rpc; + std::shared_ptr rpc; std::unique_ptr rpc_handler; if (config.rpc_enable) { diff --git a/nano/node/ipc/ipc_server.cpp b/nano/node/ipc/ipc_server.cpp index 3d13d6676..12ed3385f 100644 --- a/nano/node/ipc/ipc_server.cpp +++ b/nano/node/ipc/ipc_server.cpp @@ -511,7 +511,7 @@ public: } else { - node->logger.error (nano::log::type::ipc, "Acceptor error: ", ec.message ()); + node->logger.error (nano::log::type::ipc, "Acceptor error: {}", ec.message ()); } if (ec != boost::asio::error::operation_aborted && acceptor->is_open ()) diff --git a/nano/rpc/rpc.cpp b/nano/rpc/rpc.cpp index 196feee1a..2ef3e2615 100644 --- a/nano/rpc/rpc.cpp +++ b/nano/rpc/rpc.cpp @@ -57,10 +57,16 @@ void nano::rpc::start () void nano::rpc::accept () { auto connection (std::make_shared (config, io_ctx, logger, rpc_handler_interface)); - acceptor.async_accept (connection->socket, boost::asio::bind_executor (connection->strand, [this, connection] (boost::system::error_code const & ec) { - if (ec != boost::asio::error::operation_aborted && acceptor.is_open ()) + acceptor.async_accept (connection->socket, + boost::asio::bind_executor (connection->strand, [this_w = std::weak_ptr{ shared_from_this () }, connection] (boost::system::error_code const & ec) { + auto this_l = this_w.lock (); + if (!this_l) { - accept (); + return; + } + if (ec != boost::asio::error::operation_aborted && this_l->acceptor.is_open ()) + { + this_l->accept (); } if (!ec) { @@ -68,7 +74,7 @@ void nano::rpc::accept () } else { - logger.error (nano::log::type::rpc, "Error accepting RPC connection: {}", ec.message ()); + this_l->logger.error (nano::log::type::rpc, "Error accepting RPC connection: {}", ec.message ()); } })); } @@ -79,20 +85,16 @@ void nano::rpc::stop () acceptor.close (); } -std::unique_ptr nano::get_rpc (std::shared_ptr io_ctx_a, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a) +std::shared_ptr nano::get_rpc (std::shared_ptr io_ctx_a, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a) { - std::unique_ptr impl; - if (config_a.tls_config && config_a.tls_config->enable_https) { #ifdef NANO_SECURE_RPC - impl = std::make_unique (io_ctx_a, config_a, rpc_handler_interface_a); + return std::make_shared (io_ctx_a, config_a, rpc_handler_interface_a); #endif } else { - impl = std::make_unique (io_ctx_a, config_a, rpc_handler_interface_a); + return std::make_shared (io_ctx_a, config_a, rpc_handler_interface_a); } - - return impl; } diff --git a/nano/rpc/rpc.hpp b/nano/rpc/rpc.hpp index 353d26313..73f4b0ebf 100644 --- a/nano/rpc/rpc.hpp +++ b/nano/rpc/rpc.hpp @@ -17,7 +17,7 @@ namespace nano { class rpc_handler_interface; -class rpc +class rpc : public std::enable_shared_from_this { public: rpc (std::shared_ptr, nano::rpc_config config_a, nano::rpc_handler_interface & rpc_handler_interface_a); @@ -44,5 +44,5 @@ public: }; /** Returns the correct RPC implementation based on TLS configuration */ -std::unique_ptr get_rpc (std::shared_ptr, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a); +std::shared_ptr get_rpc (std::shared_ptr, nano::rpc_config const & config_a, nano::rpc_handler_interface & rpc_handler_interface_a); } diff --git a/nano/rpc/rpc_request_processor.cpp b/nano/rpc/rpc_request_processor.cpp index d7315890c..e5231aef4 100644 --- a/nano/rpc/rpc_request_processor.cpp +++ b/nano/rpc/rpc_request_processor.cpp @@ -19,9 +19,9 @@ nano::rpc_request_processor::rpc_request_processor (boost::asio::io_context & io { connections.push_back (std::make_shared (nano::ipc::ipc_client (io_ctx), false)); auto connection = this->connections.back (); - connection->client.async_connect (ipc_address, ipc_port, [connection, &connections_mutex = this->connections_mutex] (nano::error err) { + connection->client.async_connect (ipc_address, ipc_port, + [connection] (nano::error err) { // Even if there is an error this needs to be set so that another attempt can be made to connect with the ipc connection - nano::lock_guard lk{ connections_mutex }; connection->is_available = true; }); } @@ -85,7 +85,6 @@ void nano::rpc_request_processor::read_payload (std::shared_ptr lk{ connections_mutex }; connection.is_available = true; // Allow people to use it now } diff --git a/nano/rpc/rpc_request_processor.hpp b/nano/rpc/rpc_request_processor.hpp index 155323985..d3983084d 100644 --- a/nano/rpc/rpc_request_processor.hpp +++ b/nano/rpc/rpc_request_processor.hpp @@ -5,6 +5,7 @@ #include #include +#include #include namespace nano @@ -17,7 +18,7 @@ struct ipc_connection } nano::ipc::ipc_client client; - bool is_available{ false }; + std::atomic is_available{ false }; }; struct rpc_request diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 284bea023..f846ac9d4 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -36,6 +36,13 @@ using namespace std::chrono_literals; using namespace nano::test; +TEST (rpc, creation) +{ + nano::test::system system; + auto node = add_ipc_enabled_node (system); + ASSERT_NO_THROW (add_rpc (system, node)); +} + TEST (rpc, wrapped_task) { nano::test::system system; @@ -1701,7 +1708,6 @@ TEST (rpc, keepalive) ASSERT_EQ (0, node0->network.size ()); ASSERT_NO_ERROR (system.poll ()); } - node1->stop (); } TEST (rpc, peers) @@ -5199,7 +5205,6 @@ TEST (rpc, online_reps) ASSERT_NE (representatives3.end (), item3); ASSERT_EQ (new_rep.to_account (), item3->first); ASSERT_EQ (representatives3.size (), 1); - node2->stop (); } TEST (rpc, confirmation_history) @@ -5334,7 +5339,6 @@ TEST (rpc, block_confirm_confirmed) ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out) != 0); // Callback result is error because callback target port isn't listening ASSERT_EQ (1, node->stats.count (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out)); - node->stop (); } TEST (rpc, node_id) @@ -5971,12 +5975,13 @@ TEST (rpc, active_difficulty) } // This is mainly to check for threading issues with TSAN +// TODO: Use multiple threads to run io context TEST (rpc, simultaneous_calls) { // This tests simulatenous calls to the same node in different threads nano::test::system system; auto node = add_ipc_enabled_node (system); - nano::thread_runner runner (system.io_ctx, node->config.io_threads); + nano::node_rpc_config node_rpc_config; nano::ipc::ipc_server ipc_server (*node, node_rpc_config); nano::rpc_config rpc_config{ nano::dev::network_params.network, system.get_available_port (), true }; @@ -5984,8 +5989,9 @@ TEST (rpc, simultaneous_calls) ASSERT_TRUE (ipc_tcp_port.has_value ()); rpc_config.rpc_process.num_ipc_connections = 8; nano::ipc_rpc_processor ipc_rpc_processor (*system.io_ctx, rpc_config, ipc_tcp_port.value ()); - nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor); - rpc.start (); + auto rpc = std::make_shared (system.io_ctx, rpc_config, ipc_rpc_processor); + nano::test::start_stop_guard stop_guard{ *rpc }; + boost::property_tree::ptree request; request.put ("action", "account_block_count"); request.put ("account", nano::dev::genesis_key.pub.to_account ()); @@ -6001,7 +6007,7 @@ TEST (rpc, simultaneous_calls) std::atomic count{ num }; for (int i = 0; i < num; ++i) { - std::thread ([&test_responses, &promise, &count, i, port = rpc.listening_port ()] () { + std::thread ([&test_responses, &promise, &count, i, port = rpc->listening_port ()] () { test_responses[i]->run (port); if (--count == 0) { @@ -6011,8 +6017,8 @@ TEST (rpc, simultaneous_calls) .detach (); } - promise.get_future ().wait (); - + auto future = promise.get_future (); + ASSERT_TIMELY (5s, future.wait_for (0s) == std::future_status::ready); ASSERT_TIMELY (60s, std::all_of (test_responses.begin (), test_responses.end (), [] (auto const & test_response) { return test_response->status != 0; })); for (int i = 0; i < num; ++i) @@ -6021,11 +6027,6 @@ TEST (rpc, simultaneous_calls) std::string block_count_text (test_responses[i]->json.get ("block_count")); ASSERT_EQ ("1", block_count_text); } - rpc.stop (); - system.stop (); - ipc_server.stop (); - system.io_ctx->stop (); - runner.join (); } // This tests that the inprocess RPC (i.e without using IPC) works correctly diff --git a/nano/test_common/system.cpp b/nano/test_common/system.cpp index 1e0f938f7..df300b0ea 100644 --- a/nano/test_common/system.cpp +++ b/nano/test_common/system.cpp @@ -32,7 +32,8 @@ std::string nano::error_system_messages::message (int ev) const */ nano::test::system::system () : - io_ctx{ std::make_shared () } + io_ctx{ std::make_shared () }, + io_guard{ boost::asio::make_work_guard (*io_ctx) } { auto scale_str = std::getenv ("DEADLINE_SCALE_FACTOR"); if (scale_str) @@ -70,6 +71,24 @@ nano::test::system::~system () #endif } +void nano::test::system::stop () +{ + logger.debug (nano::log::type::system, "Stopping..."); + + // Keep io_context running while stopping nodes + for (auto & node : nodes) + { + stop_node (*node); + } + for (auto & node : disconnected_nodes) + { + stop_node (*node); + } + + io_guard.reset (); + work.stop (); +} + nano::node & nano::test::system::node (std::size_t index) const { debug_assert (index < nodes.size ()); @@ -97,8 +116,9 @@ std::shared_ptr nano::test::system::add_node (nano::node_config cons wallet->insert_adhoc (rep->prv); } node->start (); - nodes.reserve (nodes.size () + 1); nodes.push_back (node); + + // Connect with other nodes if (nodes.size () > 1) { debug_assert (nodes.size () - 1 <= node->network_params.network.max_peers_per_ip || node->flags.disable_max_peers_per_ip); // Check that we don't start more nodes than limit for single IP address @@ -174,19 +194,43 @@ std::shared_ptr nano::test::system::add_node (nano::node_config cons return node; } +// TODO: Merge with add_node std::shared_ptr nano::test::system::make_disconnected_node (std::optional opt_node_config, nano::node_flags flags) { nano::node_config node_config = opt_node_config.has_value () ? *opt_node_config : default_config (); auto node = std::make_shared (io_ctx, nano::unique_path (), node_config, work, flags); - if (node->init_error ()) + for (auto i : initialization_blocks) { - std::cerr << "node init error\n"; - return nullptr; + auto result = node->ledger.process (node->store.tx_begin_write (), i); + debug_assert (result == nano::block_status::progress); } + debug_assert (!node->init_error ()); node->start (); + disconnected_nodes.push_back (node); + + logger.debug (nano::log::type::system, "Node started (disconnected): {}", node->get_node_id ().to_node_id ()); + return node; } +void nano::test::system::register_node (std::shared_ptr const & node) +{ + debug_assert (std::find (nodes.begin (), nodes.end (), node) == nodes.end ()); + nodes.push_back (node); +} + +void nano::test::system::stop_node (nano::node & node) +{ + auto stopped = std::async (std::launch::async, [&node] () { + node.stop (); + }); + auto ec = poll_until_true (5s, [&] () { + auto status = stopped.wait_for (0s); + return status == std::future_status::ready; + }); + debug_assert (!ec); +} + void nano::test::system::ledger_initialization_set (std::vector const & reps, nano::amount const & reserve) { nano::block_hash previous = nano::dev::genesis->hash (); @@ -574,15 +618,6 @@ void nano::test::system::generate_mass_activity (uint32_t count_a, nano::node & } } -void nano::test::system::stop () -{ - for (auto i : nodes) - { - i->stop (); - } - work.stop (); -} - nano::node_config nano::test::system::default_config () { nano::node_config config{ get_available_port () }; diff --git a/nano/test_common/system.hpp b/nano/test_common/system.hpp index 00808006a..971319818 100644 --- a/nano/test_common/system.hpp +++ b/nano/test_common/system.hpp @@ -25,6 +25,8 @@ namespace test system (uint16_t, nano::transport::transport_type = nano::transport::transport_type::tcp, nano::node_flags = nano::node_flags ()); ~system (); + void stop (); + void ledger_initialization_set (std::vector const & reps, nano::amount const & reserve = 0); void generate_activity (nano::node &, std::vector &); void generate_mass_activity (uint32_t, nano::node &); @@ -50,7 +52,6 @@ namespace test std::error_code poll (std::chrono::nanoseconds const & sleep_time = std::chrono::milliseconds (50)); std::error_code poll_until_true (std::chrono::nanoseconds deadline, std::function); void delay_ms (std::chrono::milliseconds const & delay); - void stop (); void deadline_set (std::chrono::duration const & delta); /* * Convenience function to get a reference to a node at given index. Does bound checking. @@ -61,6 +62,8 @@ namespace test // Make an independent node that uses system resources but is not part of the system node list and does not automatically connect to other nodes std::shared_ptr make_disconnected_node (std::optional opt_node_config = std::nullopt, nano::node_flags = nano::node_flags ()); + void register_node (std::shared_ptr const &); + void stop_node (nano::node &); /* * Returns default config for node running in test environment @@ -75,7 +78,9 @@ namespace test public: std::shared_ptr io_ctx; + boost::asio::executor_work_guard io_guard; std::vector> nodes; + std::vector> disconnected_nodes; nano::stats stats; nano::logger logger{ "tests" }; nano::work_pool work{ nano::dev::network_params.network, std::max (nano::hardware_concurrency (), 1u) };